feat: split add into tasks and use pipeline architecture (#141)

* feat: split add into tasks and use pipeline architecture
This commit is contained in:
Boris 2024-09-30 14:09:20 +02:00 committed by GitHub
parent 56868d8a6f
commit 01582d7a55
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 151 additions and 9 deletions

View file

@ -1,6 +1,6 @@
from .api.v1.config.config import config from .api.v1.config.config import config
from .api.v1.add.add import add from .api.v1.add import add
from .api.v1.cognify.cognify_v2 import cognify from .api.v1.cognify import cognify
from .api.v1.datasets.datasets import datasets from .api.v1.datasets.datasets import datasets
from .api.v1.search.search import search, SearchType from .api.v1.search.search import search, SearchType
from .api.v1.prune import prune from .api.v1.prune import prune

View file

@ -1 +1 @@
from .add import add from .add_v2 import add

View file

@ -0,0 +1,22 @@
from typing import Union, BinaryIO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task
from cognee.tasks.ingestion import save_data_to_storage, ingest_data
from cognee.infrastructure.databases.relational import create_db_and_tables
async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_name: str = "main_dataset", user: User = None):
await create_db_and_tables()
if user is None:
user = await get_default_user()
tasks = [
Task(save_data_to_storage, dataset_name),
Task(ingest_data, dataset_name, user)
]
pipeline = run_tasks(tasks, data, "add_pipeline")
async for result in pipeline:
print(result)

View file

@ -0,0 +1 @@
from .cognify_v2 import cognify

View file

@ -2,15 +2,13 @@ from io import BufferedReader
from typing import Union, BinaryIO from typing import Union, BinaryIO
from .exceptions import IngestionException from .exceptions import IngestionException
from .data_types import TextData, BinaryData from .data_types import TextData, BinaryData
from tempfile import SpooledTemporaryFile
def classify(data: Union[str, BinaryIO], filename: str = None): def classify(data: Union[str, BinaryIO], filename: str = None):
if isinstance(data, str): if isinstance(data, str):
return TextData(data) return TextData(data)
if isinstance(data, BufferedReader): if isinstance(data, BufferedReader) or isinstance(data, SpooledTemporaryFile):
return BinaryData(data, data.name.split("/")[-1] if data.name else filename) return BinaryData(data, data.name.split("/")[-1] if data.name else filename)
if hasattr(data, "file"):
return BinaryData(data.file, filename)
raise IngestionException(f"Type of data sent to classify(data: Union[str, BinaryIO) not supported: {type(data)}") raise IngestionException(f"Type of data sent to classify(data: Union[str, BinaryIO) not supported: {type(data)}")

View file

@ -129,7 +129,7 @@ async def run_tasks_base(tasks: [Task], data = None, user: User = None):
"task_name": running_task.executable.__name__, "task_name": running_task.executable.__name__,
}) })
raise error raise error
elif inspect.isfunction(running_task.executable): elif inspect.isfunction(running_task.executable):
logger.info("Function task started: `%s`", running_task.executable.__name__) logger.info("Function task started: `%s`", running_task.executable.__name__)
send_telemetry("Function Task Started", user.id, { send_telemetry("Function Task Started", user.id, {

View file

@ -26,7 +26,7 @@ class Task():
self.task_config["batch_size"] = 1 self.task_config["batch_size"] = 1
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
combined_args = self.default_params["args"] + args combined_args = args + self.default_params["args"]
combined_kwargs = { **self.default_params["kwargs"], **kwargs } combined_kwargs = { **self.default_params["kwargs"], **kwargs }
return self.executable(*combined_args, **combined_kwargs) return self.executable(*combined_args, **combined_kwargs)

View file

@ -0,0 +1,2 @@
from .ingest_data import ingest_data
from .save_data_to_storage import save_data_to_storage

View file

@ -0,0 +1,92 @@
import dlt
import cognee.modules.ingestion as ingestion
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import get_relational_config, get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
async def ingest_data(file_paths: list[str], dataset_name: str, user: User):
relational_config = get_relational_config()
destination = dlt.destinations.sqlalchemy(
credentials = {
"host": relational_config.db_host,
"port": relational_config.db_port,
"username": relational_config.db_username,
"password": relational_config.db_password,
"database": relational_config.db_name,
"drivername": relational_config.db_provider,
},
)
pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
)
@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(file_paths: str, user: User):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
file_metadata = classified_data.get_metadata()
from sqlalchemy import select
from cognee.modules.data.models import Data
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)
data = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()
if data is not None:
data.name = file_metadata["name"]
data.raw_data_location = file_metadata["file_path"]
data.extension = file_metadata["extension"]
data.mime_type = file_metadata["mime_type"]
await session.merge(data)
await session.commit()
else:
data = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
)
dataset.data.append(data)
await session.commit()
yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
}
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
run_info = pipeline.run(
data_resources(file_paths, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)
send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)
return run_info

View file

@ -0,0 +1,27 @@
from typing import Union, BinaryIO
from cognee.modules.ingestion import save_data_to_file
def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]:
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]
file_paths = []
for data_item in data:
# data is a file object coming from upload.
if hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename = data_item.filename)
file_paths.append(file_path)
if isinstance(data_item, str):
# data is a file path
if data_item.startswith("file://") or data_item.startswith("/"):
file_paths.append(data_item.replace("file://", ""))
# data is text
else:
file_path = save_data_to_file(data_item, dataset_name)
file_paths.append(file_path)
return file_paths