From beea2f5e0a95bb5723435f439e0709c8365c3c55 Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Wed, 13 Aug 2025 13:58:09 +0200 Subject: [PATCH] Incremental loading migration (#1238) ## Description Add relational db migration for incremental loading, change incremental loading to work document per document instead of async together ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- .../1daae0df1866_incremental_loading.py | 48 +++++++++++++++ .../modules/pipelines/operations/run_tasks.py | 59 +++++++++++++------ 2 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 alembic/versions/1daae0df1866_incremental_loading.py diff --git a/alembic/versions/1daae0df1866_incremental_loading.py b/alembic/versions/1daae0df1866_incremental_loading.py new file mode 100644 index 000000000..4d8c3dde1 --- /dev/null +++ b/alembic/versions/1daae0df1866_incremental_loading.py @@ -0,0 +1,48 @@ +"""incremental_loading + +Revision ID: 1daae0df1866 +Revises: b9274c27a25a +Create Date: 2025-08-12 13:14:12.515935 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.ext.mutable import MutableDict + +# revision identifiers, used by Alembic. +revision: str = "1daae0df1866" +down_revision: Union[str, None] = "b9274c27a25a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + # If column already exists skip migration + pipeline_status_column = _get_column(insp, "data", "pipeline_status") + if not pipeline_status_column: + op.add_column( + "data", + sa.Column( + "pipeline_status", + MutableDict.as_mutable(sa.JSON), + nullable=False, + server_default=sa.text("'{}'"), + ), + ) + + +def downgrade() -> None: + op.drop_column("data", "pipeline_status") diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index cc52c947b..daee0f5dc 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -263,24 +263,49 @@ async def run_tasks( if incremental_loading: data = await resolve_data_directories(data) - # Create async tasks per data item that will run the pipeline for the data item - data_item_tasks = [ - asyncio.create_task( - _run_tasks_data_item( - data_item, - dataset, - tasks, - pipeline_name, - pipeline_id, - pipeline_run_id, - context, - user, - incremental_loading, - ) + # TODO: Return to using async.gather for data items after Cognee release + # # Create async tasks per data item that will run the pipeline for the data item + # data_item_tasks = [ + # asyncio.create_task( + # _run_tasks_data_item( + # data_item, + # dataset, + # tasks, + # pipeline_name, + # pipeline_id, + # pipeline_run_id, + # context, + # user, + # incremental_loading, + # ) + # ) + # for data_item in data + # ] + # results = await asyncio.gather(*data_item_tasks) + # # Remove skipped data items from results + # results = [result for result in results if result] + + ### TEMP sync data item handling + results = [] + # Run the pipeline for each data_item sequentially, one after the other + for data_item in data: + result = await _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, ) - for data_item in data - ] - results = await asyncio.gather(*data_item_tasks) + + # Skip items that returned a false-y value + if result: + results.append(result) + ### END + # Remove skipped data items from results results = [result for result in results if result]