feat: Add incremental load

This commit is contained in:
Igor Ilic 2025-07-10 21:58:11 +02:00
parent d98253c28c
commit 593cfcab5b
6 changed files with 44 additions and 4 deletions

View file

@ -15,6 +15,7 @@ async def add(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
dataset_id: UUID = None, dataset_id: UUID = None,
incremental_loading: bool = True,
): ):
""" """
Add data to Cognee for knowledge graph processing. Add data to Cognee for knowledge graph processing.
@ -153,6 +154,7 @@ async def add(
pipeline_name="add_pipeline", pipeline_name="add_pipeline",
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
): ):
pipeline_run_info = run_info pipeline_run_info = run_info

View file

@ -39,6 +39,7 @@ async def cognify(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
run_in_background: bool = False, run_in_background: bool = False,
incremental_loading: bool = True,
): ):
""" """
Transform ingested data into a structured knowledge graph. Transform ingested data into a structured knowledge graph.
@ -194,6 +195,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
else: else:
return await run_cognify_blocking( return await run_cognify_blocking(
@ -202,6 +204,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
@ -211,6 +214,7 @@ async def run_cognify_blocking(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
total_run_info = {} total_run_info = {}
@ -221,6 +225,7 @@ async def run_cognify_blocking(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
): ):
if run_info.dataset_id: if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info total_run_info[run_info.dataset_id] = run_info
@ -236,6 +241,7 @@ async def run_cognify_as_background_process(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
# Store pipeline status for all pipelines # Store pipeline status for all pipelines
pipeline_run_started_info = [] pipeline_run_started_info = []
@ -267,6 +273,7 @@ async def run_cognify_as_background_process(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
) )
# Save dataset Pipeline run started info # Save dataset Pipeline run started info

View file

@ -1,6 +1,7 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base from cognee.infrastructure.databases.relational import Base
@ -20,11 +21,14 @@ class Data(Base):
owner_id = Column(UUID, index=True) owner_id = Column(UUID, index=True)
content_hash = Column(String) content_hash = Column(String)
external_metadata = Column(JSON) external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings # Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
# wouldn't be noticed when commiting a database session
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer) token_count = Column(Integer)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
pipeline_status = Column(JSON)
datasets = relationship( datasets = relationship(
"Dataset", "Dataset",

View file

@ -27,7 +27,6 @@ from cognee.modules.data.methods import (
from cognee.modules.pipelines.models.PipelineRunInfo import ( from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunStarted, PipelineRunStarted,
PipelineRunErrored,
) )
from cognee.infrastructure.databases.relational import ( from cognee.infrastructure.databases.relational import (
@ -54,6 +53,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
incremental_loading: bool = True,
): ):
# Note: These context variables allow different value assignment for databases in Cognee # Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc. # per async task, thread, process and etc.
@ -108,6 +108,7 @@ async def cognee_pipeline(
data=data, data=data,
pipeline_name=pipeline_name, pipeline_name=pipeline_name,
context={"dataset": dataset}, context={"dataset": dataset},
incremental_loading=incremental_loading,
): ):
yield run_info yield run_info
@ -119,6 +120,7 @@ async def run_pipeline(
data=None, data=None,
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
context: dict = None, context: dict = None,
incremental_loading=True,
): ):
check_dataset_name(dataset.name) check_dataset_name(dataset.name)
@ -186,7 +188,9 @@ async def run_pipeline(
if not isinstance(task, Task): if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task") raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) pipeline_run = run_tasks(
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
)
async for pipeline_run_info in pipeline_run: async for pipeline_run_info in pipeline_run:
yield pipeline_run_info yield pipeline_run_info

View file

@ -4,6 +4,7 @@ import cognee.modules.ingestion as ingestion
from uuid import UUID from uuid import UUID
from typing import Any from typing import Any
from functools import wraps from functools import wraps
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
@ -60,6 +61,7 @@ async def run_tasks(
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None, context: dict = None,
incremental_loading: bool = True,
): ):
if not user: if not user:
user = get_default_user() user = get_default_user()
@ -106,6 +108,17 @@ async def run_tasks(
# If data was already processed by Cognee get data id # If data was already processed by Cognee get data id
data_id = data_item.id data_id = data_item.id
# If incremental_loading is set to True don't process documents already processed by pipeline
if incremental_loading:
# Check pipeline status, if Data already processed for pipeline before skip current processing
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if data_point.pipeline_status.get(pipeline_name) == "Completed":
break
try: try:
async for result in run_tasks_with_telemetry( async for result in run_tasks_with_telemetry(
tasks=tasks, tasks=tasks,
@ -130,6 +143,15 @@ async def run_tasks(
"data_id": data_id, "data_id": data_id,
} }
# Update pipeline status for Data element
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = "Completed"
await session.merge(data_point)
await session.commit()
except Exception as error: except Exception as error:
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline # Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
ingestion_error = error ingestion_error = error

View file

@ -131,6 +131,7 @@ async def ingest_data(
content_hash=file_metadata["content_hash"], content_hash=file_metadata["content_hash"],
external_metadata=ext_metadata, external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None, node_set=json.dumps(node_set) if node_set else None,
pipeline_status={},
token_count=-1, token_count=-1,
) )