From 01582d7a55bac8c607f053ac29c23473b3022b79 Mon Sep 17 00:00:00 2001 From: Boris Date: Mon, 30 Sep 2024 14:09:20 +0200 Subject: [PATCH] feat: split add into tasks and use pipeline architecture (#141) * feat: split add into tasks and use pipeline architecture --- cognee/__init__.py | 4 +- cognee/api/v1/add/__init__.py | 2 +- cognee/api/v1/add/add_v2.py | 22 +++++ cognee/api/v1/cognify/__init__.py | 1 + cognee/modules/ingestion/classify.py | 6 +- .../modules/pipelines/operations/run_tasks.py | 2 +- cognee/modules/pipelines/tasks/Task.py | 2 +- cognee/tasks/ingestion/__init__.py | 2 + cognee/tasks/ingestion/ingest_data.py | 92 +++++++++++++++++++ .../tasks/ingestion/save_data_to_storage.py | 27 ++++++ 10 files changed, 151 insertions(+), 9 deletions(-) create mode 100644 cognee/api/v1/add/add_v2.py create mode 100644 cognee/tasks/ingestion/__init__.py create mode 100644 cognee/tasks/ingestion/ingest_data.py create mode 100644 cognee/tasks/ingestion/save_data_to_storage.py diff --git a/cognee/__init__.py b/cognee/__init__.py index 04c13eae1..1d488568c 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -1,6 +1,6 @@ from .api.v1.config.config import config -from .api.v1.add.add import add -from .api.v1.cognify.cognify_v2 import cognify +from .api.v1.add import add +from .api.v1.cognify import cognify from .api.v1.datasets.datasets import datasets from .api.v1.search.search import search, SearchType from .api.v1.prune import prune diff --git a/cognee/api/v1/add/__init__.py b/cognee/api/v1/add/__init__.py index 08e0be68b..e8d8d9f93 100644 --- a/cognee/api/v1/add/__init__.py +++ b/cognee/api/v1/add/__init__.py @@ -1 +1 @@ -from .add import add +from .add_v2 import add diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py new file mode 100644 index 000000000..291ec5f4c --- /dev/null +++ b/cognee/api/v1/add/add_v2.py @@ -0,0 +1,22 @@ +from typing import Union, BinaryIO +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_default_user +from cognee.modules.pipelines import run_tasks, Task +from cognee.tasks.ingestion import save_data_to_storage, ingest_data +from cognee.infrastructure.databases.relational import create_db_and_tables + +async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_name: str = "main_dataset", user: User = None): + await create_db_and_tables() + + if user is None: + user = await get_default_user() + + tasks = [ + Task(save_data_to_storage, dataset_name), + Task(ingest_data, dataset_name, user) + ] + + pipeline = run_tasks(tasks, data, "add_pipeline") + + async for result in pipeline: + print(result) diff --git a/cognee/api/v1/cognify/__init__.py b/cognee/api/v1/cognify/__init__.py index e69de29bb..4e1662bf0 100644 --- a/cognee/api/v1/cognify/__init__.py +++ b/cognee/api/v1/cognify/__init__.py @@ -0,0 +1 @@ +from .cognify_v2 import cognify diff --git a/cognee/modules/ingestion/classify.py b/cognee/modules/ingestion/classify.py index d0c3ca0aa..8e8c9fb00 100644 --- a/cognee/modules/ingestion/classify.py +++ b/cognee/modules/ingestion/classify.py @@ -2,15 +2,13 @@ from io import BufferedReader from typing import Union, BinaryIO from .exceptions import IngestionException from .data_types import TextData, BinaryData +from tempfile import SpooledTemporaryFile def classify(data: Union[str, BinaryIO], filename: str = None): if isinstance(data, str): return TextData(data) - if isinstance(data, BufferedReader): + if isinstance(data, BufferedReader) or isinstance(data, SpooledTemporaryFile): return BinaryData(data, data.name.split("/")[-1] if data.name else filename) - if hasattr(data, "file"): - return BinaryData(data.file, filename) - raise IngestionException(f"Type of data sent to classify(data: Union[str, BinaryIO) not supported: {type(data)}") diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 62a0c346a..5f15aae80 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -129,7 +129,7 @@ async def run_tasks_base(tasks: [Task], data = None, user: User = None): "task_name": running_task.executable.__name__, }) raise error - + elif inspect.isfunction(running_task.executable): logger.info("Function task started: `%s`", running_task.executable.__name__) send_telemetry("Function Task Started", user.id, { diff --git a/cognee/modules/pipelines/tasks/Task.py b/cognee/modules/pipelines/tasks/Task.py index 8f3c510cb..7de60643a 100644 --- a/cognee/modules/pipelines/tasks/Task.py +++ b/cognee/modules/pipelines/tasks/Task.py @@ -26,7 +26,7 @@ class Task(): self.task_config["batch_size"] = 1 def run(self, *args, **kwargs): - combined_args = self.default_params["args"] + args + combined_args = args + self.default_params["args"] combined_kwargs = { **self.default_params["kwargs"], **kwargs } return self.executable(*combined_args, **combined_kwargs) diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py new file mode 100644 index 000000000..cc36e4ebe --- /dev/null +++ b/cognee/tasks/ingestion/__init__.py @@ -0,0 +1,2 @@ +from .ingest_data import ingest_data +from .save_data_to_storage import save_data_to_storage diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py new file mode 100644 index 000000000..a7ccdd8fa --- /dev/null +++ b/cognee/tasks/ingestion/ingest_data.py @@ -0,0 +1,92 @@ +import dlt +import cognee.modules.ingestion as ingestion + +from cognee.shared.utils import send_telemetry +from cognee.modules.users.models import User +from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine +from cognee.modules.data.methods import create_dataset +from cognee.modules.users.permissions.methods import give_permission_on_document + +async def ingest_data(file_paths: list[str], dataset_name: str, user: User): + relational_config = get_relational_config() + + destination = dlt.destinations.sqlalchemy( + credentials = { + "host": relational_config.db_host, + "port": relational_config.db_port, + "username": relational_config.db_username, + "password": relational_config.db_password, + "database": relational_config.db_name, + "drivername": relational_config.db_provider, + }, + ) + + pipeline = dlt.pipeline( + pipeline_name = "file_load_from_filesystem", + destination = destination, + ) + + @dlt.resource(standalone = True, merge_key = "id") + async def data_resources(file_paths: str, user: User): + for file_path in file_paths: + with open(file_path.replace("file://", ""), mode = "rb") as file: + classified_data = ingestion.classify(file) + + data_id = ingestion.identify(classified_data) + + file_metadata = classified_data.get_metadata() + + from sqlalchemy import select + from cognee.modules.data.models import Data + + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + dataset = await create_dataset(dataset_name, user.id, session) + + data = (await session.execute( + select(Data).filter(Data.id == data_id) + )).scalar_one_or_none() + + if data is not None: + data.name = file_metadata["name"] + data.raw_data_location = file_metadata["file_path"] + data.extension = file_metadata["extension"] + data.mime_type = file_metadata["mime_type"] + + await session.merge(data) + await session.commit() + else: + data = Data( + id = data_id, + name = file_metadata["name"], + raw_data_location = file_metadata["file_path"], + extension = file_metadata["extension"], + mime_type = file_metadata["mime_type"], + ) + + dataset.data.append(data) + await session.commit() + + yield { + "id": data_id, + "name": file_metadata["name"], + "file_path": file_metadata["file_path"], + "extension": file_metadata["extension"], + "mime_type": file_metadata["mime_type"], + } + + await give_permission_on_document(user, data_id, "read") + await give_permission_on_document(user, data_id, "write") + + + send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id) + run_info = pipeline.run( + data_resources(file_paths, user), + table_name = "file_metadata", + dataset_name = dataset_name, + write_disposition = "merge", + ) + send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id) + + return run_info diff --git a/cognee/tasks/ingestion/save_data_to_storage.py b/cognee/tasks/ingestion/save_data_to_storage.py new file mode 100644 index 000000000..f646db52e --- /dev/null +++ b/cognee/tasks/ingestion/save_data_to_storage.py @@ -0,0 +1,27 @@ +from typing import Union, BinaryIO +from cognee.modules.ingestion import save_data_to_file + +def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]: + if not isinstance(data, list): + # Convert data to a list as we work with lists further down. + data = [data] + + file_paths = [] + + for data_item in data: + # data is a file object coming from upload. + if hasattr(data_item, "file"): + file_path = save_data_to_file(data_item.file, dataset_name, filename = data_item.filename) + file_paths.append(file_path) + + if isinstance(data_item, str): + # data is a file path + if data_item.startswith("file://") or data_item.startswith("/"): + file_paths.append(data_item.replace("file://", "")) + + # data is text + else: + file_path = save_data_to_file(data_item, dataset_name) + file_paths.append(file_path) + + return file_paths