From dc38ff38387075ec9c12bebd5e00a83ef8623bb1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 10 Jul 2025 23:03:48 +0200 Subject: [PATCH] fix: Resolve S3 adding specific files issue --- cognee/modules/pipelines/operations/run_tasks.py | 4 +++- cognee/tasks/ingestion/resolve_data_directories.py | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index e7968176f..549f233aa 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -93,13 +93,15 @@ async def run_tasks( if not isinstance(data, list): data = [data] + if incremental_loading: + data = await resolve_data_directories(data) + # TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets) for data_item in data: # If incremental_loading of data is set to True don't process documents already processed by pipeline if incremental_loading: # If data is being added to Cognee for the first time calculate the id of the data if not isinstance(data_item, Data): - data = await resolve_data_directories(data) file_path = await save_data_item_to_storage(data_item, dataset.name) # Ingest data and add metadata with open_data_file(file_path, s3fs=fs) as file: diff --git a/cognee/tasks/ingestion/resolve_data_directories.py b/cognee/tasks/ingestion/resolve_data_directories.py index e8bdbca7d..9447eded2 100644 --- a/cognee/tasks/ingestion/resolve_data_directories.py +++ b/cognee/tasks/ingestion/resolve_data_directories.py @@ -40,6 +40,9 @@ async def resolve_data_directories( if include_subdirectories: base_path = item if item.endswith("/") else item + "/" s3_keys = fs.glob(base_path + "**") + # If path is not directory attempt to add item directly + if not s3_keys: + s3_keys = fs.ls(item) else: s3_keys = fs.ls(item) # Filter out keys that represent directories using fs.isdir