From af2b0735f660a465d6cfb3fbebcaadaef7a44940 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 10 Jul 2025 20:46:36 +0200 Subject: [PATCH] feat: incremental load initial commit --- cognee/api/v1/cognify/cognify.py | 1 + cognee/modules/data/models/Data.py | 1 + cognee/modules/ingestion/methods/__init__.py | 2 + cognee/modules/ingestion/methods/get_s3_fs.py | 14 ++++ .../ingestion/methods/open_data_file.py | 6 ++ .../pipelines/models/PipelineRunInfo.py | 1 + .../modules/pipelines/operations/pipeline.py | 2 + .../modules/pipelines/operations/run_tasks.py | 79 ++++++++++++++++--- cognee/tasks/ingestion/ingest_data.py | 23 +----- 9 files changed, 96 insertions(+), 33 deletions(-) create mode 100644 cognee/modules/ingestion/methods/__init__.py create mode 100644 cognee/modules/ingestion/methods/get_s3_fs.py create mode 100644 cognee/modules/ingestion/methods/open_data_file.py diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 8b9a1be65..af9dac8f1 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -242,6 +242,7 @@ async def run_cognify_as_background_process( async def handle_rest_of_the_run(pipeline_list): # Execute all provided pipelines one by one to avoid database write conflicts + # TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created for pipeline in pipeline_list: while True: try: diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 49ab28271..f9ae0f50f 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -24,6 +24,7 @@ class Data(Base): token_count = Column(Integer) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + pipeline_status = Column(JSON) datasets = relationship( "Dataset", diff --git a/cognee/modules/ingestion/methods/__init__.py b/cognee/modules/ingestion/methods/__init__.py new file mode 100644 index 000000000..06800340c --- /dev/null +++ b/cognee/modules/ingestion/methods/__init__.py @@ -0,0 +1,2 @@ +from .get_s3_fs import get_s3_fs +from .open_data_file import open_data_file diff --git a/cognee/modules/ingestion/methods/get_s3_fs.py b/cognee/modules/ingestion/methods/get_s3_fs.py new file mode 100644 index 000000000..a1a91000f --- /dev/null +++ b/cognee/modules/ingestion/methods/get_s3_fs.py @@ -0,0 +1,14 @@ +from cognee.api.v1.add.config import get_s3_config + + +def get_s3_fs(): + s3_config = get_s3_config() + + fs = None + if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None: + import s3fs + + fs = s3fs.S3FileSystem( + key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False + ) + return fs diff --git a/cognee/modules/ingestion/methods/open_data_file.py b/cognee/modules/ingestion/methods/open_data_file.py new file mode 100644 index 000000000..e59ee0eb7 --- /dev/null +++ b/cognee/modules/ingestion/methods/open_data_file.py @@ -0,0 +1,6 @@ +def open_data_file(file_path: str, s3fs): + if file_path.startswith("s3://"): + return s3fs.open(file_path, mode="rb") + else: + local_path = file_path.replace("file://", "") + return open(local_path, mode="rb") diff --git a/cognee/modules/pipelines/models/PipelineRunInfo.py b/cognee/modules/pipelines/models/PipelineRunInfo.py index d910f4fc8..429e730c8 100644 --- a/cognee/modules/pipelines/models/PipelineRunInfo.py +++ b/cognee/modules/pipelines/models/PipelineRunInfo.py @@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel): dataset_id: UUID dataset_name: str payload: Optional[Any] = None + data_ingestion_info: Optional[dict] = None model_config = { "arbitrary_types_allowed": True, diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 8ffe33a8a..6f23af1a5 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -2,6 +2,7 @@ import asyncio from typing import Union from uuid import NAMESPACE_OID, uuid5, UUID +from cognee.modules.ingestion.exceptions import IngestionError from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset @@ -26,6 +27,7 @@ from cognee.modules.data.methods import ( from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunCompleted, PipelineRunStarted, + PipelineRunErrored, ) from cognee.infrastructure.databases.relational import ( diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index a301b2e32..13dc22da3 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -1,4 +1,6 @@ import os +import cognee.modules.ingestion as ingestion + from uuid import UUID from typing import Any from functools import wraps @@ -6,9 +8,11 @@ from functools import wraps from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.users.models import User +from cognee.modules.ingestion.methods import get_s3_fs, open_data_file from cognee.shared.logging_utils import get_logger from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines.utils import generate_pipeline_id +from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunCompleted, PipelineRunErrored, @@ -79,20 +83,67 @@ async def run_tasks( payload=data, ) + fs = get_s3_fs() + data_items_pipeline_run_info = {} + ingestion_error = None try: - async for result in run_tasks_with_telemetry( - tasks=tasks, - data=data, - user=user, - pipeline_name=pipeline_id, - context=context, - ): - yield PipelineRunYield( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - payload=result, - ) + if not isinstance(data, list): + data = [data] + data = await resolve_data_directories(data) + + # TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets) + for data_item in data: + file_path = await save_data_item_to_storage(data_item, dataset.name) + # Ingest data and add metadata + with open_data_file(file_path, s3fs=fs) as file: + classified_data = ingestion.classify(file, s3fs=fs) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + + try: + async for result in run_tasks_with_telemetry( + tasks=tasks, + data=data_item, + user=user, + pipeline_name=pipeline_id, + context=context, + ): + yield PipelineRunYield( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=result, + ) + + data_items_pipeline_run_info[data_id] = { + "run_info": PipelineRunCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } + + except Exception as error: + # Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline + ingestion_error = error + logger.error( + f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}." + ) + + data_items_pipeline_run_info = { + "run_info": PipelineRunErrored( + pipeline_run_id=pipeline_run_id, + payload=error, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } + + # re-raise error found during data ingestion + if ingestion_error: + raise ingestion_error await log_pipeline_run_complete( pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data @@ -102,6 +153,7 @@ async def run_tasks( pipeline_run_id=pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, + data_ingestion_info=data_items_pipeline_run_info, ) except Exception as error: @@ -114,6 +166,7 @@ async def run_tasks( payload=error, dataset_id=dataset.id, dataset_name=dataset.name, + data_ingestion_info=data_items_pipeline_run_info, ) raise error diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index f3fa43b62..cbd3a8861 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -9,6 +9,7 @@ from cognee.modules.data.models import Data from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets +from cognee.modules.ingestion.methods import get_s3_fs, open_data_file from cognee.modules.data.methods import ( get_authorized_existing_datasets, get_dataset_data, @@ -18,9 +19,6 @@ from cognee.modules.data.methods import ( from .save_data_item_to_storage import save_data_item_to_storage -from cognee.api.v1.add.config import get_s3_config - - async def ingest_data( data: Any, dataset_name: str, @@ -31,22 +29,7 @@ async def ingest_data( if not user: user = await get_default_user() - s3_config = get_s3_config() - - fs = None - if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None: - import s3fs - - fs = s3fs.S3FileSystem( - key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False - ) - - def open_data_file(file_path: str): - if file_path.startswith("s3://"): - return fs.open(file_path, mode="rb") - else: - local_path = file_path.replace("file://", "") - return open(local_path, mode="rb") + fs = get_s3_fs() def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): @@ -95,7 +78,7 @@ async def ingest_data( file_path = await save_data_item_to_storage(data_item, dataset_name) # Ingest data and add metadata - with open_data_file(file_path) as file: + with open_data_file(file_path, s3fs=fs) as file: classified_data = ingestion.classify(file, s3fs=fs) # data_id is the hash of file contents + owner id to avoid duplicate data