diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 13dc22da3..51a7c6adc 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -8,6 +8,7 @@ from functools import wraps from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.users.models import User +from cognee.modules.data.models import Data from cognee.modules.ingestion.methods import get_s3_fs, open_data_file from cognee.shared.logging_utils import get_logger from cognee.modules.users.methods import get_default_user @@ -89,21 +90,26 @@ async def run_tasks( try: if not isinstance(data, list): data = [data] - data = await resolve_data_directories(data) # TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets) for data_item in data: - file_path = await save_data_item_to_storage(data_item, dataset.name) - # Ingest data and add metadata - with open_data_file(file_path, s3fs=fs) as file: - classified_data = ingestion.classify(file, s3fs=fs) - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) + # If data is being added to Cognee for the first time calculate the id of the data + if not isinstance(data_item, Data): + data = await resolve_data_directories(data) + file_path = await save_data_item_to_storage(data_item, dataset.name) + # Ingest data and add metadata + with open_data_file(file_path, s3fs=fs) as file: + classified_data = ingestion.classify(file, s3fs=fs) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + else: + # If data was already processed by Cognee get data id + data_id = data_item.id try: async for result in run_tasks_with_telemetry( tasks=tasks, - data=data_item, + data=[data_item], user=user, pipeline_name=pipeline_id, context=context,