Incremental loading migration (#1238)

<!-- .github/pull_request_template.md -->

## Description
Add relational db migration for incremental loading, change incremental
loading to work document per document instead of async together

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-08-13 13:58:09 +02:00 committed by GitHub
parent fd9aaf57b1
commit beea2f5e0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 90 additions and 17 deletions

View file

@ -0,0 +1,48 @@
"""incremental_loading
Revision ID: 1daae0df1866
Revises: b9274c27a25a
Create Date: 2025-08-12 13:14:12.515935
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.ext.mutable import MutableDict
# revision identifiers, used by Alembic.
revision: str = "1daae0df1866"
down_revision: Union[str, None] = "b9274c27a25a"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
# If column already exists skip migration
pipeline_status_column = _get_column(insp, "data", "pipeline_status")
if not pipeline_status_column:
op.add_column(
"data",
sa.Column(
"pipeline_status",
MutableDict.as_mutable(sa.JSON),
nullable=False,
server_default=sa.text("'{}'"),
),
)
def downgrade() -> None:
op.drop_column("data", "pipeline_status")

View file

@ -263,24 +263,49 @@ async def run_tasks(
if incremental_loading: if incremental_loading:
data = await resolve_data_directories(data) data = await resolve_data_directories(data)
# Create async tasks per data item that will run the pipeline for the data item # TODO: Return to using async.gather for data items after Cognee release
data_item_tasks = [ # # Create async tasks per data item that will run the pipeline for the data item
asyncio.create_task( # data_item_tasks = [
_run_tasks_data_item( # asyncio.create_task(
data_item, # _run_tasks_data_item(
dataset, # data_item,
tasks, # dataset,
pipeline_name, # tasks,
pipeline_id, # pipeline_name,
pipeline_run_id, # pipeline_id,
context, # pipeline_run_id,
user, # context,
incremental_loading, # user,
) # incremental_loading,
# )
# )
# for data_item in data
# ]
# results = await asyncio.gather(*data_item_tasks)
# # Remove skipped data items from results
# results = [result for result in results if result]
### TEMP sync data item handling
results = []
# Run the pipeline for each data_item sequentially, one after the other
for data_item in data:
result = await _run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
) )
for data_item in data
] # Skip items that returned a false-y value
results = await asyncio.gather(*data_item_tasks) if result:
results.append(result)
### END
# Remove skipped data items from results # Remove skipped data items from results
results = [result for result in results if result] results = [result for result in results if result]