refactor: Refactor ingestion to only have one ingestion task
This commit is contained in:
parent
75bc7f67eb
commit
0c7c1d7503
8 changed files with 99 additions and 243 deletions
|
|
@ -2,7 +2,7 @@ from typing import Union, BinaryIO
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.modules.pipelines import run_tasks, Task
|
from cognee.modules.pipelines import run_tasks, Task
|
||||||
from cognee.tasks.ingestion import ingest_data_with_metadata, resolve_data_directories
|
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
|
||||||
from cognee.infrastructure.databases.relational import (
|
from cognee.infrastructure.databases.relational import (
|
||||||
create_db_and_tables as create_relational_db_and_tables,
|
create_db_and_tables as create_relational_db_and_tables,
|
||||||
)
|
)
|
||||||
|
|
@ -22,7 +22,7 @@ async def add(
|
||||||
if user is None:
|
if user is None:
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
|
|
||||||
tasks = [Task(resolve_data_directories), Task(ingest_data_with_metadata, dataset_name, user)]
|
tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)]
|
||||||
|
|
||||||
pipeline = run_tasks(tasks, data, "add_pipeline")
|
pipeline = run_tasks(tasks, data, "add_pipeline")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
|
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
|
||||||
from cognee.tasks.documents import classify_documents, extract_chunks_from_documents
|
from cognee.tasks.documents import classify_documents, extract_chunks_from_documents
|
||||||
from cognee.tasks.graph import extract_graph_from_data
|
from cognee.tasks.graph import extract_graph_from_data
|
||||||
from cognee.tasks.ingestion import ingest_data_with_metadata
|
from cognee.tasks.ingestion import ingest_data
|
||||||
from cognee.tasks.repo_processor import (
|
from cognee.tasks.repo_processor import (
|
||||||
enrich_dependency_graph,
|
enrich_dependency_graph,
|
||||||
expand_dependency_graph,
|
expand_dependency_graph,
|
||||||
|
|
@ -68,7 +68,7 @@ async def run_code_graph_pipeline(repo_path, include_docs=True):
|
||||||
if include_docs:
|
if include_docs:
|
||||||
non_code_tasks = [
|
non_code_tasks = [
|
||||||
Task(get_non_py_files, task_config={"batch_size": 50}),
|
Task(get_non_py_files, task_config={"batch_size": 50}),
|
||||||
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
|
Task(ingest_data, dataset_name="repo_docs", user=user),
|
||||||
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
|
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
|
||||||
Task(classify_documents),
|
Task(classify_documents),
|
||||||
Task(extract_chunks_from_documents, max_tokens=cognee_config.max_tokens),
|
Task(extract_chunks_from_documents, max_tokens=cognee_config.max_tokens),
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,3 @@
|
||||||
from .ingest_data import ingest_data
|
|
||||||
from .save_data_to_storage import save_data_to_storage
|
|
||||||
from .save_data_item_to_storage import save_data_item_to_storage
|
from .save_data_item_to_storage import save_data_item_to_storage
|
||||||
from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage
|
from .ingest_data import ingest_data
|
||||||
from .ingest_data_with_metadata import ingest_data_with_metadata
|
|
||||||
from .resolve_data_directories import resolve_data_directories
|
from .resolve_data_directories import resolve_data_directories
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,21 @@
|
||||||
|
from typing import Any, List
|
||||||
|
|
||||||
import dlt
|
import dlt
|
||||||
import cognee.modules.ingestion as ingestion
|
import cognee.modules.ingestion as ingestion
|
||||||
|
|
||||||
from uuid import UUID
|
|
||||||
from cognee.shared.utils import send_telemetry
|
|
||||||
from cognee.modules.users.models import User
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
from cognee.modules.data.methods import create_dataset
|
from cognee.modules.data.methods import create_dataset
|
||||||
|
from cognee.modules.data.models.DatasetData import DatasetData
|
||||||
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.users.permissions.methods import give_permission_on_document
|
from cognee.modules.users.permissions.methods import give_permission_on_document
|
||||||
|
from cognee.shared.utils import send_telemetry
|
||||||
|
from cognee.modules.data.operations.write_metadata import write_metadata
|
||||||
from .get_dlt_destination import get_dlt_destination
|
from .get_dlt_destination import get_dlt_destination
|
||||||
|
from .save_data_item_to_storage import (
|
||||||
|
save_data_item_to_storage,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
destination = get_dlt_destination()
|
destination = get_dlt_destination()
|
||||||
|
|
||||||
pipeline = dlt.pipeline(
|
pipeline = dlt.pipeline(
|
||||||
|
|
@ -18,12 +23,12 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
||||||
destination=destination,
|
destination=destination,
|
||||||
)
|
)
|
||||||
|
|
||||||
@dlt.resource(standalone=True, merge_key="id")
|
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
|
||||||
async def data_resources(file_paths: str):
|
async def data_resources(file_paths: List[str], user: User):
|
||||||
for file_path in file_paths:
|
for file_path in file_paths:
|
||||||
with open(file_path.replace("file://", ""), mode="rb") as file:
|
with open(file_path.replace("file://", ""), mode="rb") as file:
|
||||||
classified_data = ingestion.classify(file)
|
classified_data = ingestion.classify(file)
|
||||||
data_id = ingestion.identify(classified_data)
|
data_id = ingestion.identify(classified_data, user)
|
||||||
file_metadata = classified_data.get_metadata()
|
file_metadata = classified_data.get_metadata()
|
||||||
yield {
|
yield {
|
||||||
"id": data_id,
|
"id": data_id,
|
||||||
|
|
@ -31,71 +36,110 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
|
||||||
"file_path": file_metadata["file_path"],
|
"file_path": file_metadata["file_path"],
|
||||||
"extension": file_metadata["extension"],
|
"extension": file_metadata["extension"],
|
||||||
"mime_type": file_metadata["mime_type"],
|
"mime_type": file_metadata["mime_type"],
|
||||||
|
"content_hash": file_metadata["content_hash"],
|
||||||
|
"owner_id": str(user.id),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def data_storing(table_name, dataset_name, user: User):
|
async def data_storing(data: Any, dataset_name: str, user: User):
|
||||||
db_engine = get_relational_engine()
|
if not isinstance(data, list):
|
||||||
|
# Convert data to a list as we work with lists further down.
|
||||||
|
data = [data]
|
||||||
|
|
||||||
|
file_paths = []
|
||||||
|
|
||||||
|
# Process data
|
||||||
|
for data_item in data:
|
||||||
|
file_path = await save_data_item_to_storage(data_item, dataset_name)
|
||||||
|
|
||||||
|
file_paths.append(file_path)
|
||||||
|
|
||||||
|
# Ingest data and add metadata
|
||||||
|
with open(file_path.replace("file://", ""), mode="rb") as file:
|
||||||
|
classified_data = ingestion.classify(file)
|
||||||
|
|
||||||
|
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||||
|
data_id = ingestion.identify(classified_data, user)
|
||||||
|
|
||||||
|
file_metadata = classified_data.get_metadata()
|
||||||
|
|
||||||
async with db_engine.get_async_session() as session:
|
|
||||||
# Read metadata stored with dlt
|
|
||||||
files_metadata = await db_engine.get_all_data_from_table(table_name, dataset_name)
|
|
||||||
for file_metadata in files_metadata:
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from cognee.modules.data.models import Data
|
from cognee.modules.data.models import Data
|
||||||
|
|
||||||
dataset = await create_dataset(dataset_name, user.id, session)
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
data = (
|
async with db_engine.get_async_session() as session:
|
||||||
await session.execute(select(Data).filter(Data.id == UUID(file_metadata["id"])))
|
dataset = await create_dataset(dataset_name, user.id, session)
|
||||||
).scalar_one_or_none()
|
|
||||||
|
|
||||||
if data is not None:
|
# Check to see if data should be updated
|
||||||
data.name = file_metadata["name"]
|
data_point = (
|
||||||
data.raw_data_location = file_metadata["file_path"]
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
data.extension = file_metadata["extension"]
|
).scalar_one_or_none()
|
||||||
data.mime_type = file_metadata["mime_type"]
|
|
||||||
|
if data_point is not None:
|
||||||
|
data_point.name = file_metadata["name"]
|
||||||
|
data_point.raw_data_location = file_metadata["file_path"]
|
||||||
|
data_point.extension = file_metadata["extension"]
|
||||||
|
data_point.mime_type = file_metadata["mime_type"]
|
||||||
|
data_point.owner_id = user.id
|
||||||
|
data_point.content_hash = file_metadata["content_hash"]
|
||||||
|
await session.merge(data_point)
|
||||||
|
else:
|
||||||
|
data_point = Data(
|
||||||
|
id=data_id,
|
||||||
|
name=file_metadata["name"],
|
||||||
|
raw_data_location=file_metadata["file_path"],
|
||||||
|
extension=file_metadata["extension"],
|
||||||
|
mime_type=file_metadata["mime_type"],
|
||||||
|
owner_id=user.id,
|
||||||
|
content_hash=file_metadata["content_hash"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if data is already in dataset
|
||||||
|
dataset_data = (
|
||||||
|
await session.execute(
|
||||||
|
select(DatasetData).filter(
|
||||||
|
DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
# If data is not present in dataset add it
|
||||||
|
if dataset_data is None:
|
||||||
|
dataset.data.append(data_point)
|
||||||
|
|
||||||
await session.merge(data)
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
else:
|
await write_metadata(data_item, data_point.id, file_metadata)
|
||||||
data = Data(
|
|
||||||
id=UUID(file_metadata["id"]),
|
|
||||||
name=file_metadata["name"],
|
|
||||||
raw_data_location=file_metadata["file_path"],
|
|
||||||
extension=file_metadata["extension"],
|
|
||||||
mime_type=file_metadata["mime_type"],
|
|
||||||
)
|
|
||||||
|
|
||||||
dataset.data.append(data)
|
await give_permission_on_document(user, data_id, "read")
|
||||||
await session.commit()
|
await give_permission_on_document(user, data_id, "write")
|
||||||
|
return file_paths
|
||||||
await give_permission_on_document(user, UUID(file_metadata["id"]), "read")
|
|
||||||
await give_permission_on_document(user, UUID(file_metadata["id"]), "write")
|
|
||||||
|
|
||||||
send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
|
send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
file_paths = await data_storing(data, dataset_name, user)
|
||||||
|
|
||||||
# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
|
# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
|
||||||
# can't be used inside the pipeline
|
# can't be used inside the pipeline
|
||||||
if db_engine.engine.dialect.name == "sqlite":
|
if db_engine.engine.dialect.name == "sqlite":
|
||||||
# To use sqlite with dlt dataset_name must be set to "main".
|
# To use sqlite with dlt dataset_name must be set to "main".
|
||||||
# Sqlite doesn't support schemas
|
# Sqlite doesn't support schemas
|
||||||
run_info = pipeline.run(
|
run_info = pipeline.run(
|
||||||
data_resources(file_paths),
|
data_resources(file_paths, user),
|
||||||
table_name="file_metadata",
|
table_name="file_metadata",
|
||||||
dataset_name="main",
|
dataset_name="main",
|
||||||
write_disposition="merge",
|
write_disposition="merge",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# Data should be stored in the same schema to allow deduplication
|
||||||
run_info = pipeline.run(
|
run_info = pipeline.run(
|
||||||
data_resources(file_paths),
|
data_resources(file_paths, user),
|
||||||
table_name="file_metadata",
|
table_name="file_metadata",
|
||||||
dataset_name=dataset_name,
|
dataset_name="public",
|
||||||
write_disposition="merge",
|
write_disposition="merge",
|
||||||
)
|
)
|
||||||
|
|
||||||
await data_storing("file_metadata", dataset_name, user)
|
|
||||||
send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)
|
send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)
|
||||||
|
|
||||||
return run_info
|
return run_info
|
||||||
|
|
|
||||||
|
|
@ -1,145 +0,0 @@
|
||||||
from typing import Any, List
|
|
||||||
|
|
||||||
import dlt
|
|
||||||
import cognee.modules.ingestion as ingestion
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
from cognee.modules.data.methods import create_dataset
|
|
||||||
from cognee.modules.data.models.DatasetData import DatasetData
|
|
||||||
from cognee.modules.users.models import User
|
|
||||||
from cognee.modules.users.permissions.methods import give_permission_on_document
|
|
||||||
from cognee.shared.utils import send_telemetry
|
|
||||||
from cognee.modules.data.operations.write_metadata import write_metadata
|
|
||||||
from .get_dlt_destination import get_dlt_destination
|
|
||||||
from .save_data_item_with_metadata_to_storage import (
|
|
||||||
save_data_item_with_metadata_to_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|
||||||
destination = get_dlt_destination()
|
|
||||||
|
|
||||||
pipeline = dlt.pipeline(
|
|
||||||
pipeline_name="file_load_from_filesystem",
|
|
||||||
destination=destination,
|
|
||||||
)
|
|
||||||
|
|
||||||
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
|
|
||||||
async def data_resources(file_paths: List[str], user: User):
|
|
||||||
for file_path in file_paths:
|
|
||||||
with open(file_path.replace("file://", ""), mode="rb") as file:
|
|
||||||
classified_data = ingestion.classify(file)
|
|
||||||
data_id = ingestion.identify(classified_data, user)
|
|
||||||
file_metadata = classified_data.get_metadata()
|
|
||||||
yield {
|
|
||||||
"id": data_id,
|
|
||||||
"name": file_metadata["name"],
|
|
||||||
"file_path": file_metadata["file_path"],
|
|
||||||
"extension": file_metadata["extension"],
|
|
||||||
"mime_type": file_metadata["mime_type"],
|
|
||||||
"content_hash": file_metadata["content_hash"],
|
|
||||||
"owner_id": str(user.id),
|
|
||||||
}
|
|
||||||
|
|
||||||
async def data_storing(data: Any, dataset_name: str, user: User):
|
|
||||||
if not isinstance(data, list):
|
|
||||||
# Convert data to a list as we work with lists further down.
|
|
||||||
data = [data]
|
|
||||||
|
|
||||||
file_paths = []
|
|
||||||
|
|
||||||
# Process data
|
|
||||||
for data_item in data:
|
|
||||||
file_path = await save_data_item_with_metadata_to_storage(data_item, dataset_name)
|
|
||||||
|
|
||||||
file_paths.append(file_path)
|
|
||||||
|
|
||||||
# Ingest data and add metadata
|
|
||||||
with open(file_path.replace("file://", ""), mode="rb") as file:
|
|
||||||
classified_data = ingestion.classify(file)
|
|
||||||
|
|
||||||
# data_id is the hash of file contents + owner id to avoid duplicate data
|
|
||||||
data_id = ingestion.identify(classified_data, user)
|
|
||||||
|
|
||||||
file_metadata = classified_data.get_metadata()
|
|
||||||
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from cognee.modules.data.models import Data
|
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
|
|
||||||
async with db_engine.get_async_session() as session:
|
|
||||||
dataset = await create_dataset(dataset_name, user.id, session)
|
|
||||||
|
|
||||||
# Check to see if data should be updated
|
|
||||||
data_point = (
|
|
||||||
await session.execute(select(Data).filter(Data.id == data_id))
|
|
||||||
).scalar_one_or_none()
|
|
||||||
|
|
||||||
if data_point is not None:
|
|
||||||
data_point.name = file_metadata["name"]
|
|
||||||
data_point.raw_data_location = file_metadata["file_path"]
|
|
||||||
data_point.extension = file_metadata["extension"]
|
|
||||||
data_point.mime_type = file_metadata["mime_type"]
|
|
||||||
data_point.owner_id = user.id
|
|
||||||
data_point.content_hash = file_metadata["content_hash"]
|
|
||||||
await session.merge(data_point)
|
|
||||||
else:
|
|
||||||
data_point = Data(
|
|
||||||
id=data_id,
|
|
||||||
name=file_metadata["name"],
|
|
||||||
raw_data_location=file_metadata["file_path"],
|
|
||||||
extension=file_metadata["extension"],
|
|
||||||
mime_type=file_metadata["mime_type"],
|
|
||||||
owner_id=user.id,
|
|
||||||
content_hash=file_metadata["content_hash"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if data is already in dataset
|
|
||||||
dataset_data = (
|
|
||||||
await session.execute(
|
|
||||||
select(DatasetData).filter(
|
|
||||||
DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
).scalar_one_or_none()
|
|
||||||
# If data is not present in dataset add it
|
|
||||||
if dataset_data is None:
|
|
||||||
dataset.data.append(data_point)
|
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
await write_metadata(data_item, data_point.id, file_metadata)
|
|
||||||
|
|
||||||
await give_permission_on_document(user, data_id, "read")
|
|
||||||
await give_permission_on_document(user, data_id, "write")
|
|
||||||
return file_paths
|
|
||||||
|
|
||||||
send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
|
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
|
|
||||||
file_paths = await data_storing(data, dataset_name, user)
|
|
||||||
|
|
||||||
# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
|
|
||||||
# can't be used inside the pipeline
|
|
||||||
if db_engine.engine.dialect.name == "sqlite":
|
|
||||||
# To use sqlite with dlt dataset_name must be set to "main".
|
|
||||||
# Sqlite doesn't support schemas
|
|
||||||
run_info = pipeline.run(
|
|
||||||
data_resources(file_paths, user),
|
|
||||||
table_name="file_metadata",
|
|
||||||
dataset_name="main",
|
|
||||||
write_disposition="merge",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Data should be stored in the same schema to allow deduplication
|
|
||||||
run_info = pipeline.run(
|
|
||||||
data_resources(file_paths, user),
|
|
||||||
table_name="file_metadata",
|
|
||||||
dataset_name="public",
|
|
||||||
write_disposition="merge",
|
|
||||||
)
|
|
||||||
|
|
||||||
send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)
|
|
||||||
|
|
||||||
return run_info
|
|
||||||
|
|
@ -1,12 +1,18 @@
|
||||||
from typing import Union, BinaryIO
|
from typing import Union, BinaryIO, Any
|
||||||
|
|
||||||
from cognee.modules.ingestion.exceptions import IngestionError
|
from cognee.modules.ingestion.exceptions import IngestionError
|
||||||
from cognee.modules.ingestion import save_data_to_file
|
from cognee.modules.ingestion import save_data_to_file
|
||||||
|
|
||||||
|
|
||||||
def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str) -> str:
|
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
|
||||||
|
if "llama_index" in str(type(data_item)):
|
||||||
|
# Dynamic import is used because the llama_index module is optional.
|
||||||
|
from .transform_data import get_data_from_llama_index
|
||||||
|
|
||||||
|
file_path = get_data_from_llama_index(data_item, dataset_name)
|
||||||
|
|
||||||
# data is a file object coming from upload.
|
# data is a file object coming from upload.
|
||||||
if hasattr(data_item, "file"):
|
elif hasattr(data_item, "file"):
|
||||||
file_path = save_data_to_file(data_item.file, filename=data_item.filename)
|
file_path = save_data_to_file(data_item.file, filename=data_item.filename)
|
||||||
|
|
||||||
elif isinstance(data_item, str):
|
elif isinstance(data_item, str):
|
||||||
|
|
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
from typing import Union, BinaryIO, Any
|
|
||||||
|
|
||||||
from cognee.modules.ingestion.exceptions import IngestionError
|
|
||||||
from cognee.modules.ingestion import save_data_to_file
|
|
||||||
|
|
||||||
|
|
||||||
async def save_data_item_with_metadata_to_storage(
|
|
||||||
data_item: Union[BinaryIO, str, Any], dataset_name: str
|
|
||||||
) -> str:
|
|
||||||
if "llama_index" in str(type(data_item)):
|
|
||||||
# Dynamic import is used because the llama_index module is optional.
|
|
||||||
from .transform_data import get_data_from_llama_index
|
|
||||||
|
|
||||||
file_path = get_data_from_llama_index(data_item, dataset_name)
|
|
||||||
|
|
||||||
# data is a file object coming from upload.
|
|
||||||
elif hasattr(data_item, "file"):
|
|
||||||
file_path = save_data_to_file(data_item.file, filename=data_item.filename)
|
|
||||||
|
|
||||||
elif isinstance(data_item, str):
|
|
||||||
# data is a file path
|
|
||||||
if data_item.startswith("file://") or data_item.startswith("/"):
|
|
||||||
file_path = data_item.replace("file://", "")
|
|
||||||
# data is text
|
|
||||||
else:
|
|
||||||
file_path = save_data_to_file(data_item)
|
|
||||||
else:
|
|
||||||
raise IngestionError(message=f"Data type not supported: {type(data_item)}")
|
|
||||||
|
|
||||||
return file_path
|
|
||||||
|
|
@ -1,16 +0,0 @@
|
||||||
from typing import Union, BinaryIO
|
|
||||||
from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage
|
|
||||||
|
|
||||||
|
|
||||||
def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]:
|
|
||||||
if not isinstance(data, list):
|
|
||||||
# Convert data to a list as we work with lists further down.
|
|
||||||
data = [data]
|
|
||||||
|
|
||||||
file_paths = []
|
|
||||||
|
|
||||||
for data_item in data:
|
|
||||||
file_path = save_data_item_to_storage(data_item, dataset_name)
|
|
||||||
file_paths.append(file_path)
|
|
||||||
|
|
||||||
return file_paths
|
|
||||||
Loading…
Add table
Reference in a new issue