diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index f14861d6d..3f921a25f 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -15,6 +15,7 @@ async def add( vector_db_config: dict = None, graph_db_config: dict = None, dataset_id: UUID = None, + incremental_loading: bool = True, ): """ Add data to Cognee for knowledge graph processing. @@ -153,6 +154,7 @@ async def add( pipeline_name="add_pipeline", vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ): pipeline_run_info = run_info diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index af9dac8f1..aa2593981 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -39,6 +39,7 @@ async def cognify( vector_db_config: dict = None, graph_db_config: dict = None, run_in_background: bool = False, + incremental_loading: bool = True, ): """ Transform ingested data into a structured knowledge graph. @@ -194,6 +195,7 @@ async def cognify( datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ) else: return await run_cognify_blocking( @@ -202,6 +204,7 @@ async def cognify( datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ) @@ -211,6 +214,7 @@ async def run_cognify_blocking( datasets, graph_db_config: dict = None, vector_db_config: dict = False, + incremental_loading: bool = True, ): total_run_info = {} @@ -221,6 +225,7 @@ async def run_cognify_blocking( pipeline_name="cognify_pipeline", graph_db_config=graph_db_config, vector_db_config=vector_db_config, + incremental_loading=incremental_loading, ): if run_info.dataset_id: total_run_info[run_info.dataset_id] = run_info @@ -236,6 +241,7 @@ async def run_cognify_as_background_process( datasets, graph_db_config: dict = None, vector_db_config: dict = False, + incremental_loading: bool = True, ): # Store pipeline status for all pipelines pipeline_run_started_info = [] @@ -267,6 +273,7 @@ async def run_cognify_as_background_process( pipeline_name="cognify_pipeline", graph_db_config=graph_db_config, vector_db_config=vector_db_config, + incremental_loading=incremental_loading, ) # Save dataset Pipeline run started info diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index f9ae0f50f..56c7fa1b1 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone from uuid import uuid4 from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer +from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import relationship from cognee.infrastructure.databases.relational import Base @@ -20,11 +21,14 @@ class Data(Base): owner_id = Column(UUID, index=True) content_hash = Column(String) external_metadata = Column(JSON) - node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings + # Store NodeSet as JSON list of strings + node_set = Column(JSON, nullable=True) + # MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key + # wouldn't be noticed when commiting a database session + pipeline_status = Column(MutableDict.as_mutable(JSON)) token_count = Column(Integer) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) - pipeline_status = Column(JSON) datasets = relationship( "Dataset", diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 6f23af1a5..e8135b645 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -27,7 +27,6 @@ from cognee.modules.data.methods import ( from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunCompleted, PipelineRunStarted, - PipelineRunErrored, ) from cognee.infrastructure.databases.relational import ( @@ -54,6 +53,7 @@ async def cognee_pipeline( pipeline_name: str = "custom_pipeline", vector_db_config: dict = None, graph_db_config: dict = None, + incremental_loading: bool = True, ): # Note: These context variables allow different value assignment for databases in Cognee # per async task, thread, process and etc. @@ -108,6 +108,7 @@ async def cognee_pipeline( data=data, pipeline_name=pipeline_name, context={"dataset": dataset}, + incremental_loading=incremental_loading, ): yield run_info @@ -119,6 +120,7 @@ async def run_pipeline( data=None, pipeline_name: str = "custom_pipeline", context: dict = None, + incremental_loading=True, ): check_dataset_name(dataset.name) @@ -186,7 +188,9 @@ async def run_pipeline( if not isinstance(task, Task): raise ValueError(f"Task {task} is not an instance of Task") - pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) + pipeline_run = run_tasks( + tasks, dataset_id, data, user, pipeline_name, context, incremental_loading + ) async for pipeline_run_info in pipeline_run: yield pipeline_run_info diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 51a7c6adc..4a65faf51 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -4,6 +4,7 @@ import cognee.modules.ingestion as ingestion from uuid import UUID from typing import Any from functools import wraps +from sqlalchemy import select from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed @@ -60,6 +61,7 @@ async def run_tasks( user: User = None, pipeline_name: str = "unknown_pipeline", context: dict = None, + incremental_loading: bool = True, ): if not user: user = get_default_user() @@ -106,6 +108,17 @@ async def run_tasks( # If data was already processed by Cognee get data id data_id = data_item.id + # If incremental_loading is set to True don't process documents already processed by pipeline + if incremental_loading: + # Check pipeline status, if Data already processed for pipeline before skip current processing + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + if data_point: + if data_point.pipeline_status.get(pipeline_name) == "Completed": + break + try: async for result in run_tasks_with_telemetry( tasks=tasks, @@ -130,6 +143,15 @@ async def run_tasks( "data_id": data_id, } + # Update pipeline status for Data element + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + data_point.pipeline_status[pipeline_name] = "Completed" + await session.merge(data_point) + await session.commit() + except Exception as error: # Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline ingestion_error = error diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index cbd3a8861..98914c613 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -131,6 +131,7 @@ async def ingest_data( content_hash=file_metadata["content_hash"], external_metadata=ext_metadata, node_set=json.dumps(node_set) if node_set else None, + pipeline_status={}, token_count=-1, )