incremental loading - fallback to regular, update test cases

This commit is contained in:
Daulet Amirkhanov 2025-10-17 22:38:36 +01:00
parent a0f760a3d1
commit 1a0978fb37
2 changed files with 109 additions and 31 deletions

View file

@ -9,6 +9,7 @@ import os
from typing import Any, Dict, AsyncGenerator, Optional from typing import Any, Dict, AsyncGenerator, Optional
from sqlalchemy import select from sqlalchemy import select
from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError
import cognee.modules.ingestion as ingestion import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.infrastructure.files.utils.open_data_file import open_data_file
@ -63,36 +64,51 @@ async def run_tasks_data_item_incremental(
# If incremental_loading of data is set to True don't process documents already processed by pipeline # If incremental_loading of data is set to True don't process documents already processed by pipeline
# If data is being added to Cognee for the first time calculate the id of the data # If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data): try:
file_path = await save_data_item_to_storage(data_item) if not isinstance(data_item, Data):
# Ingest data and add metadata file_path = await save_data_item_to_storage(data_item)
async with open_data_file(file_path) as file: # Ingest data and add metadata
classified_data = ingestion.classify(file) async with open_data_file(file_path) as file:
# data_id is the hash of file contents + owner id to avoid duplicate data classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data, user) # data_id is the hash of file contents + owner id to avoid duplicate data
else: data_id = ingestion.identify(classified_data, user)
# If data was already processed by Cognee get data id else:
data_id = data_item.id # If data was already processed by Cognee get data id
data_id = data_item.id
# Check pipeline status, if Data already processed for pipeline before skip current processing # Check pipeline status, if Data already processed for pipeline before skip current processing
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
data_point = ( data_point = (
await session.execute(select(Data).filter(Data.id == data_id)) await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none() ).scalar_one_or_none()
if data_point: if data_point:
if ( if (
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
): ):
yield { yield {
"run_info": PipelineRunAlreadyCompleted( "run_info": PipelineRunAlreadyCompleted(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
), ),
"data_id": data_id, "data_id": data_id,
} }
return return
except UnsupportedPathSchemeError as e:
logger.warning(f"data_item does not support incremental loading: {str(e)}")
# Fall back to regular processing since incremental loading is not supported
async for result in run_tasks_data_item_regular(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
yield result
return
try: try:
# Process data based on data_item and list of tasks # Process data based on data_item and list of tasks

View file

@ -12,7 +12,6 @@ async def test_add_fails_when_preferred_loader_not_specified():
with pytest.raises(ValueError): with pytest.raises(ValueError):
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
incremental_loading=False, # TODO: incremental loading bypasses regular data ingestion, which breaks. Will fix
) )
@ -42,7 +41,70 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified():
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
preferred_loaders=["web_url_loader"], preferred_loaders=["web_url_loader"],
incremental_loading=False, # TODO: incremental loading bypasses regular data ingestion, which breaks. Will fix loaders_config=loaders_config,
)
except Exception as e:
pytest.fail(f"Failed to add url: {e}")
@pytest.mark.asyncio
async def test_add_with_incremental_loading_works():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
extraction_rules = {
"title": {"selector": "title"},
"headings": {"selector": "h1, h2, h3", "all": True},
"links": {"selector": "a", "attr": "href", "all": True},
"paragraphs": {"selector": "p", "all": True},
}
loaders_config = {
"web_url_loader": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
try:
await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model",
preferred_loaders=["web_url_loader"],
incremental_loading=True,
loaders_config=loaders_config,
)
except Exception as e:
pytest.fail(f"Failed to add url: {e}")
@pytest.mark.asyncio
async def test_add_without_incremental_loading_works():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
extraction_rules = {
"title": {"selector": "title"},
"headings": {"selector": "h1, h2, h3", "all": True},
"links": {"selector": "a", "attr": "href", "all": True},
"paragraphs": {"selector": "p", "all": True},
}
loaders_config = {
"web_url_loader": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
try:
await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model",
preferred_loaders=["web_url_loader"],
incremental_loading=False,
loaders_config=loaders_config, loaders_config=loaders_config,
) )
except Exception as e: except Exception as e: