From 0c7c1d7503cd59855a6c46324f64275360ef488c Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 20 Jan 2025 14:33:47 +0100 Subject: [PATCH] refactor: Refactor ingestion to only have one ingestion task --- cognee/api/v1/add/add_v2.py | 4 +- cognee/api/v1/cognify/code_graph_pipeline.py | 4 +- cognee/tasks/ingestion/__init__.py | 5 +- cognee/tasks/ingestion/ingest_data.py | 126 ++++++++++----- .../ingestion/ingest_data_with_metadata.py | 145 ------------------ .../ingestion/save_data_item_to_storage.py | 12 +- ...save_data_item_with_metadata_to_storage.py | 30 ---- .../tasks/ingestion/save_data_to_storage.py | 16 -- 8 files changed, 99 insertions(+), 243 deletions(-) delete mode 100644 cognee/tasks/ingestion/ingest_data_with_metadata.py delete mode 100644 cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py delete mode 100644 cognee/tasks/ingestion/save_data_to_storage.py diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index 1ec30e67b..dbdb93a59 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -2,7 +2,7 @@ from typing import Union, BinaryIO from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task -from cognee.tasks.ingestion import ingest_data_with_metadata, resolve_data_directories +from cognee.tasks.ingestion import ingest_data, resolve_data_directories from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, ) @@ -22,7 +22,7 @@ async def add( if user is None: user = await get_default_user() - tasks = [Task(resolve_data_directories), Task(ingest_data_with_metadata, dataset_name, user)] + tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)] pipeline = run_tasks(tasks, data, "add_pipeline") diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index 2d077f39b..dc2af0cd5 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -10,7 +10,7 @@ from cognee.modules.users.methods import get_default_user from cognee.shared.data_models import KnowledgeGraph, MonitoringTool from cognee.tasks.documents import classify_documents, extract_chunks_from_documents from cognee.tasks.graph import extract_graph_from_data -from cognee.tasks.ingestion import ingest_data_with_metadata +from cognee.tasks.ingestion import ingest_data from cognee.tasks.repo_processor import ( enrich_dependency_graph, expand_dependency_graph, @@ -68,7 +68,7 @@ async def run_code_graph_pipeline(repo_path, include_docs=True): if include_docs: non_code_tasks = [ Task(get_non_py_files, task_config={"batch_size": 50}), - Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user), + Task(ingest_data, dataset_name="repo_docs", user=user), Task(get_data_list_for_user, dataset_name="repo_docs", user=user), Task(classify_documents), Task(extract_chunks_from_documents, max_tokens=cognee_config.max_tokens), diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py index 8b873b273..9c7180be2 100644 --- a/cognee/tasks/ingestion/__init__.py +++ b/cognee/tasks/ingestion/__init__.py @@ -1,6 +1,3 @@ -from .ingest_data import ingest_data -from .save_data_to_storage import save_data_to_storage from .save_data_item_to_storage import save_data_item_to_storage -from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage -from .ingest_data_with_metadata import ingest_data_with_metadata +from .ingest_data import ingest_data from .resolve_data_directories import resolve_data_directories diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index cf7dd38ad..2306aa2de 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -1,16 +1,21 @@ +from typing import Any, List + import dlt import cognee.modules.ingestion as ingestion - -from uuid import UUID -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset +from cognee.modules.data.models.DatasetData import DatasetData +from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document +from cognee.shared.utils import send_telemetry +from cognee.modules.data.operations.write_metadata import write_metadata from .get_dlt_destination import get_dlt_destination +from .save_data_item_to_storage import ( + save_data_item_to_storage, +) -async def ingest_data(file_paths: list[str], dataset_name: str, user: User): +async def ingest_data(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() pipeline = dlt.pipeline( @@ -18,12 +23,12 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User): destination=destination, ) - @dlt.resource(standalone=True, merge_key="id") - async def data_resources(file_paths: str): + @dlt.resource(standalone=True, primary_key="id", merge_key="id") + async def data_resources(file_paths: List[str], user: User): for file_path in file_paths: with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() yield { "id": data_id, @@ -31,71 +36,110 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User): "file_path": file_metadata["file_path"], "extension": file_metadata["extension"], "mime_type": file_metadata["mime_type"], + "content_hash": file_metadata["content_hash"], + "owner_id": str(user.id), } - async def data_storing(table_name, dataset_name, user: User): - db_engine = get_relational_engine() + async def data_storing(data: Any, dataset_name: str, user: User): + if not isinstance(data, list): + # Convert data to a list as we work with lists further down. + data = [data] + + file_paths = [] + + # Process data + for data_item in data: + file_path = await save_data_item_to_storage(data_item, dataset_name) + + file_paths.append(file_path) + + # Ingest data and add metadata + with open(file_path.replace("file://", ""), mode="rb") as file: + classified_data = ingestion.classify(file) + + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + + file_metadata = classified_data.get_metadata() - async with db_engine.get_async_session() as session: - # Read metadata stored with dlt - files_metadata = await db_engine.get_all_data_from_table(table_name, dataset_name) - for file_metadata in files_metadata: from sqlalchemy import select + from cognee.modules.data.models import Data - dataset = await create_dataset(dataset_name, user.id, session) + db_engine = get_relational_engine() - data = ( - await session.execute(select(Data).filter(Data.id == UUID(file_metadata["id"]))) - ).scalar_one_or_none() + async with db_engine.get_async_session() as session: + dataset = await create_dataset(dataset_name, user.id, session) - if data is not None: - data.name = file_metadata["name"] - data.raw_data_location = file_metadata["file_path"] - data.extension = file_metadata["extension"] - data.mime_type = file_metadata["mime_type"] + # Check to see if data should be updated + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + + if data_point is not None: + data_point.name = file_metadata["name"] + data_point.raw_data_location = file_metadata["file_path"] + data_point.extension = file_metadata["extension"] + data_point.mime_type = file_metadata["mime_type"] + data_point.owner_id = user.id + data_point.content_hash = file_metadata["content_hash"] + await session.merge(data_point) + else: + data_point = Data( + id=data_id, + name=file_metadata["name"], + raw_data_location=file_metadata["file_path"], + extension=file_metadata["extension"], + mime_type=file_metadata["mime_type"], + owner_id=user.id, + content_hash=file_metadata["content_hash"], + ) + + # Check if data is already in dataset + dataset_data = ( + await session.execute( + select(DatasetData).filter( + DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id + ) + ) + ).scalar_one_or_none() + # If data is not present in dataset add it + if dataset_data is None: + dataset.data.append(data_point) - await session.merge(data) await session.commit() - else: - data = Data( - id=UUID(file_metadata["id"]), - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - ) + await write_metadata(data_item, data_point.id, file_metadata) - dataset.data.append(data) - await session.commit() - - await give_permission_on_document(user, UUID(file_metadata["id"]), "read") - await give_permission_on_document(user, UUID(file_metadata["id"]), "write") + await give_permission_on_document(user, data_id, "read") + await give_permission_on_document(user, data_id, "write") + return file_paths send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) db_engine = get_relational_engine() + file_paths = await data_storing(data, dataset_name, user) + # Note: DLT pipeline has its own event loop, therefore objects created in another event loop # can't be used inside the pipeline if db_engine.engine.dialect.name == "sqlite": # To use sqlite with dlt dataset_name must be set to "main". # Sqlite doesn't support schemas run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", dataset_name="main", write_disposition="merge", ) else: + # Data should be stored in the same schema to allow deduplication run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", - dataset_name=dataset_name, + dataset_name="public", write_disposition="merge", ) - await data_storing("file_metadata", dataset_name, user) send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py deleted file mode 100644 index 04396485c..000000000 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ /dev/null @@ -1,145 +0,0 @@ -from typing import Any, List - -import dlt -import cognee.modules.ingestion as ingestion -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.methods import create_dataset -from cognee.modules.data.models.DatasetData import DatasetData -from cognee.modules.users.models import User -from cognee.modules.users.permissions.methods import give_permission_on_document -from cognee.shared.utils import send_telemetry -from cognee.modules.data.operations.write_metadata import write_metadata -from .get_dlt_destination import get_dlt_destination -from .save_data_item_with_metadata_to_storage import ( - save_data_item_with_metadata_to_storage, -) - - -async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): - destination = get_dlt_destination() - - pipeline = dlt.pipeline( - pipeline_name="file_load_from_filesystem", - destination=destination, - ) - - @dlt.resource(standalone=True, primary_key="id", merge_key="id") - async def data_resources(file_paths: List[str], user: User): - for file_path in file_paths: - with open(file_path.replace("file://", ""), mode="rb") as file: - classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data, user) - file_metadata = classified_data.get_metadata() - yield { - "id": data_id, - "name": file_metadata["name"], - "file_path": file_metadata["file_path"], - "extension": file_metadata["extension"], - "mime_type": file_metadata["mime_type"], - "content_hash": file_metadata["content_hash"], - "owner_id": str(user.id), - } - - async def data_storing(data: Any, dataset_name: str, user: User): - if not isinstance(data, list): - # Convert data to a list as we work with lists further down. - data = [data] - - file_paths = [] - - # Process data - for data_item in data: - file_path = await save_data_item_with_metadata_to_storage(data_item, dataset_name) - - file_paths.append(file_path) - - # Ingest data and add metadata - with open(file_path.replace("file://", ""), mode="rb") as file: - classified_data = ingestion.classify(file) - - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - - file_metadata = classified_data.get_metadata() - - from sqlalchemy import select - - from cognee.modules.data.models import Data - - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - dataset = await create_dataset(dataset_name, user.id, session) - - # Check to see if data should be updated - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - - if data_point is not None: - data_point.name = file_metadata["name"] - data_point.raw_data_location = file_metadata["file_path"] - data_point.extension = file_metadata["extension"] - data_point.mime_type = file_metadata["mime_type"] - data_point.owner_id = user.id - data_point.content_hash = file_metadata["content_hash"] - await session.merge(data_point) - else: - data_point = Data( - id=data_id, - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - owner_id=user.id, - content_hash=file_metadata["content_hash"], - ) - - # Check if data is already in dataset - dataset_data = ( - await session.execute( - select(DatasetData).filter( - DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id - ) - ) - ).scalar_one_or_none() - # If data is not present in dataset add it - if dataset_data is None: - dataset.data.append(data_point) - - await session.commit() - await write_metadata(data_item, data_point.id, file_metadata) - - await give_permission_on_document(user, data_id, "read") - await give_permission_on_document(user, data_id, "write") - return file_paths - - send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) - - db_engine = get_relational_engine() - - file_paths = await data_storing(data, dataset_name, user) - - # Note: DLT pipeline has its own event loop, therefore objects created in another event loop - # can't be used inside the pipeline - if db_engine.engine.dialect.name == "sqlite": - # To use sqlite with dlt dataset_name must be set to "main". - # Sqlite doesn't support schemas - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="main", - write_disposition="merge", - ) - else: - # Data should be stored in the same schema to allow deduplication - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="public", - write_disposition="merge", - ) - - send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) - - return run_info diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 9191f7ebc..3f9d572c9 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -1,12 +1,18 @@ -from typing import Union, BinaryIO +from typing import Union, BinaryIO, Any from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion import save_data_to_file -def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: + if "llama_index" in str(type(data_item)): + # Dynamic import is used because the llama_index module is optional. + from .transform_data import get_data_from_llama_index + + file_path = get_data_from_llama_index(data_item, dataset_name) + # data is a file object coming from upload. - if hasattr(data_item, "file"): + elif hasattr(data_item, "file"): file_path = save_data_to_file(data_item.file, filename=data_item.filename) elif isinstance(data_item, str): diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py deleted file mode 100644 index 92697abb7..000000000 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import Union, BinaryIO, Any - -from cognee.modules.ingestion.exceptions import IngestionError -from cognee.modules.ingestion import save_data_to_file - - -async def save_data_item_with_metadata_to_storage( - data_item: Union[BinaryIO, str, Any], dataset_name: str -) -> str: - if "llama_index" in str(type(data_item)): - # Dynamic import is used because the llama_index module is optional. - from .transform_data import get_data_from_llama_index - - file_path = get_data_from_llama_index(data_item, dataset_name) - - # data is a file object coming from upload. - elif hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, filename=data_item.filename) - - elif isinstance(data_item, str): - # data is a file path - if data_item.startswith("file://") or data_item.startswith("/"): - file_path = data_item.replace("file://", "") - # data is text - else: - file_path = save_data_to_file(data_item) - else: - raise IngestionError(message=f"Data type not supported: {type(data_item)}") - - return file_path diff --git a/cognee/tasks/ingestion/save_data_to_storage.py b/cognee/tasks/ingestion/save_data_to_storage.py deleted file mode 100644 index a56857261..000000000 --- a/cognee/tasks/ingestion/save_data_to_storage.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import Union, BinaryIO -from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage - - -def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]: - if not isinstance(data, list): - # Convert data to a list as we work with lists further down. - data = [data] - - file_paths = [] - - for data_item in data: - file_path = save_data_item_to_storage(data_item, dataset_name) - file_paths.append(file_path) - - return file_paths