From f3ce7be88588e1b5c126285b1cb39ca85dc234db Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 11 Dec 2024 14:31:54 +0100 Subject: [PATCH] feat: Add ability to send directories with data to cognee Add ability to send data directories to cognee Feature COG-656 --- cognee/tasks/ingestion/__init__.py | 1 + .../ingestion/resolve_data_directories.py | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 cognee/tasks/ingestion/resolve_data_directories.py diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py index f569267a1..8b873b273 100644 --- a/cognee/tasks/ingestion/__init__.py +++ b/cognee/tasks/ingestion/__init__.py @@ -3,3 +3,4 @@ from .save_data_to_storage import save_data_to_storage from .save_data_item_to_storage import save_data_item_to_storage from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage from .ingest_data_with_metadata import ingest_data_with_metadata +from .resolve_data_directories import resolve_data_directories diff --git a/cognee/tasks/ingestion/resolve_data_directories.py b/cognee/tasks/ingestion/resolve_data_directories.py new file mode 100644 index 000000000..5cde12642 --- /dev/null +++ b/cognee/tasks/ingestion/resolve_data_directories.py @@ -0,0 +1,61 @@ +# import os +# from typing import List, Union, BinaryIO +# +# def resolve_data_directories(data: Union[BinaryIO, List[BinaryIO], str, List[str]]): +# # We want to work with lists from now on +# if type(data) is not list: +# data = [data] +# +# # Check if data item in list is a directory +# for item in data: +# if type(item) is str: +# # If it's a directory add all files inside the directory to data list instead +# if os.path.isdir(item): +# pass + +import os +from typing import List, Union, BinaryIO + +def resolve_data_directories(data: Union[BinaryIO, List[BinaryIO], str, List[str]], include_subdirectories: bool = True): + """ + Resolves directories by replacing them with their contained files. + + Args: + data: A single file, directory, or binary stream, or a list of such items. + include_subdirectories: Whether to include files in subdirectories recursively. + + Returns: + A list of resolved files and binary streams. + """ + # Ensure `data` is a list + if not isinstance(data, list): + data = [data] + + resolved_data = [] + + for item in data: + if isinstance(item, str): # Check if the item is a path + if os.path.isdir(item): # If it's a directory + if include_subdirectories: + # Recursively add all files in the directory and subdirectories + for root, _, files in os.walk(item): + resolved_data.extend([os.path.join(root, f) for f in files]) + else: + # Add all files (not subdirectories) in the directory + resolved_data.extend( + [os.path.join(item, f) for f in os.listdir(item) if os.path.isfile(os.path.join(item, f))] + ) + elif os.path.isfile(item): # If it's a file, add it to the resolved_data list + resolved_data.append(item) + else: + raise ValueError(f"Path '{item}' is neither a file nor a directory.") + elif isinstance(item, BinaryIO): # If it's a binary stream, add it directly + resolved_data.append(item) + else: + raise TypeError(f"Unsupported type: {type(item)}. Expected str or BinaryIO.") + + return resolved_data + +# Example usage: +# files = resolve_data_directories(["/path/to/dir", "/path/to/file.txt"], include_subdirectories=True) +# print(files)