From 1a0978fb3764fb47619a0cf5a6881b7c1c70ae7e Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Fri, 17 Oct 2025 22:38:36 +0100 Subject: [PATCH] incremental loading - fallback to regular, update test cases --- .../operations/run_tasks_data_item.py | 74 +++++++++++-------- .../integration/web_url_crawler/test_add.py | 66 ++++++++++++++++- 2 files changed, 109 insertions(+), 31 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks_data_item.py b/cognee/modules/pipelines/operations/run_tasks_data_item.py index 152e72d7f..0118e7976 100644 --- a/cognee/modules/pipelines/operations/run_tasks_data_item.py +++ b/cognee/modules/pipelines/operations/run_tasks_data_item.py @@ -9,6 +9,7 @@ import os from typing import Any, Dict, AsyncGenerator, Optional from sqlalchemy import select +from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.files.utils.open_data_file import open_data_file @@ -63,36 +64,51 @@ async def run_tasks_data_item_incremental( # If incremental_loading of data is set to True don't process documents already processed by pipeline # If data is being added to Cognee for the first time calculate the id of the data - if not isinstance(data_item, Data): - file_path = await save_data_item_to_storage(data_item) - # Ingest data and add metadata - async with open_data_file(file_path) as file: - classified_data = ingestion.classify(file) - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - else: - # If data was already processed by Cognee get data id - data_id = data_item.id + try: + if not isinstance(data_item, Data): + file_path = await save_data_item_to_storage(data_item) + # Ingest data and add metadata + async with open_data_file(file_path) as file: + classified_data = ingestion.classify(file) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + else: + # If data was already processed by Cognee get data id + data_id = data_item.id - # Check pipeline status, if Data already processed for pipeline before skip current processing - async with db_engine.get_async_session() as session: - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - if data_point: - if ( - data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) - == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED - ): - yield { - "run_info": PipelineRunAlreadyCompleted( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - ), - "data_id": data_id, - } - return + # Check pipeline status, if Data already processed for pipeline before skip current processing + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + if data_point: + if ( + data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) + == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED + ): + yield { + "run_info": PipelineRunAlreadyCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } + return + except UnsupportedPathSchemeError as e: + logger.warning(f"data_item does not support incremental loading: {str(e)}") + # Fall back to regular processing since incremental loading is not supported + async for result in run_tasks_data_item_regular( + data_item=data_item, + dataset=dataset, + tasks=tasks, + pipeline_id=pipeline_id, + pipeline_run_id=pipeline_run_id, + context=context, + user=user, + ): + yield result + return try: # Process data based on data_item and list of tasks diff --git a/cognee/tests/integration/web_url_crawler/test_add.py b/cognee/tests/integration/web_url_crawler/test_add.py index 926c25a94..abd0d77ba 100644 --- a/cognee/tests/integration/web_url_crawler/test_add.py +++ b/cognee/tests/integration/web_url_crawler/test_add.py @@ -12,7 +12,6 @@ async def test_add_fails_when_preferred_loader_not_specified(): with pytest.raises(ValueError): await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", - incremental_loading=False, # TODO: incremental loading bypasses regular data ingestion, which breaks. Will fix ) @@ -42,7 +41,70 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified(): await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", preferred_loaders=["web_url_loader"], - incremental_loading=False, # TODO: incremental loading bypasses regular data ingestion, which breaks. Will fix + loaders_config=loaders_config, + ) + except Exception as e: + pytest.fail(f"Failed to add url: {e}") + + +@pytest.mark.asyncio +async def test_add_with_incremental_loading_works(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + loaders_config = { + "web_url_loader": { + "soup_config": { + "max_depth": 1, + "follow_links": False, + "extraction_rules": extraction_rules, + } + } + } + try: + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + preferred_loaders=["web_url_loader"], + incremental_loading=True, + loaders_config=loaders_config, + ) + except Exception as e: + pytest.fail(f"Failed to add url: {e}") + + +@pytest.mark.asyncio +async def test_add_without_incremental_loading_works(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + loaders_config = { + "web_url_loader": { + "soup_config": { + "max_depth": 1, + "follow_links": False, + "extraction_rules": extraction_rules, + } + } + } + try: + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + preferred_loaders=["web_url_loader"], + incremental_loading=False, loaders_config=loaders_config, ) except Exception as e: