fix: Resolve S3 adding specific files issue
This commit is contained in:
parent
67b61ff964
commit
dc38ff3838
2 changed files with 6 additions and 1 deletions
|
|
@ -93,13 +93,15 @@ async def run_tasks(
|
||||||
if not isinstance(data, list):
|
if not isinstance(data, list):
|
||||||
data = [data]
|
data = [data]
|
||||||
|
|
||||||
|
if incremental_loading:
|
||||||
|
data = await resolve_data_directories(data)
|
||||||
|
|
||||||
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
|
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
|
||||||
for data_item in data:
|
for data_item in data:
|
||||||
# If incremental_loading of data is set to True don't process documents already processed by pipeline
|
# If incremental_loading of data is set to True don't process documents already processed by pipeline
|
||||||
if incremental_loading:
|
if incremental_loading:
|
||||||
# If data is being added to Cognee for the first time calculate the id of the data
|
# If data is being added to Cognee for the first time calculate the id of the data
|
||||||
if not isinstance(data_item, Data):
|
if not isinstance(data_item, Data):
|
||||||
data = await resolve_data_directories(data)
|
|
||||||
file_path = await save_data_item_to_storage(data_item, dataset.name)
|
file_path = await save_data_item_to_storage(data_item, dataset.name)
|
||||||
# Ingest data and add metadata
|
# Ingest data and add metadata
|
||||||
with open_data_file(file_path, s3fs=fs) as file:
|
with open_data_file(file_path, s3fs=fs) as file:
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,9 @@ async def resolve_data_directories(
|
||||||
if include_subdirectories:
|
if include_subdirectories:
|
||||||
base_path = item if item.endswith("/") else item + "/"
|
base_path = item if item.endswith("/") else item + "/"
|
||||||
s3_keys = fs.glob(base_path + "**")
|
s3_keys = fs.glob(base_path + "**")
|
||||||
|
# If path is not directory attempt to add item directly
|
||||||
|
if not s3_keys:
|
||||||
|
s3_keys = fs.ls(item)
|
||||||
else:
|
else:
|
||||||
s3_keys = fs.ls(item)
|
s3_keys = fs.ls(item)
|
||||||
# Filter out keys that represent directories using fs.isdir
|
# Filter out keys that represent directories using fs.isdir
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue