fix: Resolve issue with dlt for ingest_data_with_metadata
Resolve issue caused by dlt for ingest_data_with_metadata task Fix
This commit is contained in:
parent
1709e926e2
commit
61aebf79e0
1 changed files with 45 additions and 18 deletions
|
|
@ -15,7 +15,6 @@ from .save_data_item_with_metadata_to_storage import (
|
|||
)
|
||||
|
||||
|
||||
|
||||
async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
||||
destination = get_dlt_destination()
|
||||
|
||||
|
|
@ -24,18 +23,36 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
destination = destination,
|
||||
)
|
||||
|
||||
@dlt.resource(standalone = True, merge_key = "id")
|
||||
async def data_resources(data: Any, user: User):
|
||||
@dlt.resource(standalone=True, merge_key="id")
|
||||
async def data_resources(file_paths: str):
|
||||
for file_path in file_paths:
|
||||
with open(file_path.replace("file://", ""), mode="rb") as file:
|
||||
classified_data = ingestion.classify(file)
|
||||
data_id = ingestion.identify(classified_data)
|
||||
file_metadata = classified_data.get_metadata()
|
||||
yield {
|
||||
"id": data_id,
|
||||
"name": file_metadata["name"],
|
||||
"file_path": file_metadata["file_path"],
|
||||
"extension": file_metadata["extension"],
|
||||
"mime_type": file_metadata["mime_type"],
|
||||
}
|
||||
|
||||
async def data_storing(data: Any, dataset_name: str, user: User):
|
||||
if not isinstance(data, list):
|
||||
# Convert data to a list as we work with lists further down.
|
||||
data = [data]
|
||||
|
||||
file_paths = []
|
||||
|
||||
# Process data
|
||||
for data_item in data:
|
||||
file_path = await save_data_item_with_metadata_to_storage(
|
||||
data_item, dataset_name
|
||||
)
|
||||
|
||||
file_paths.append(file_path)
|
||||
|
||||
# Ingest data and add metadata
|
||||
with open(file_path.replace("file://", ""), mode = "rb") as file:
|
||||
classified_data = ingestion.classify(file)
|
||||
|
|
@ -77,25 +94,35 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
|
|||
await session.commit()
|
||||
await write_metadata(data_item, data_point.id, file_metadata)
|
||||
|
||||
|
||||
yield {
|
||||
"id": data_id,
|
||||
"name": file_metadata["name"],
|
||||
"file_path": file_metadata["file_path"],
|
||||
"extension": file_metadata["extension"],
|
||||
"mime_type": file_metadata["mime_type"],
|
||||
}
|
||||
|
||||
await give_permission_on_document(user, data_id, "read")
|
||||
await give_permission_on_document(user, data_id, "write")
|
||||
return file_paths
|
||||
|
||||
send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
|
||||
run_info = pipeline.run(
|
||||
data_resources(data, user),
|
||||
table_name = "file_metadata",
|
||||
dataset_name = dataset_name,
|
||||
write_disposition = "merge",
|
||||
)
|
||||
|
||||
db_engine = get_relational_engine()
|
||||
|
||||
file_paths = await data_storing(data, dataset_name, user)
|
||||
|
||||
# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
|
||||
# can't be used inside the pipeline
|
||||
if db_engine.engine.dialect.name == "sqlite":
|
||||
# To use sqlite with dlt dataset_name must be set to "main".
|
||||
# Sqlite doesn't support schemas
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
table_name="file_metadata",
|
||||
dataset_name="main",
|
||||
write_disposition="merge",
|
||||
)
|
||||
else:
|
||||
run_info = pipeline.run(
|
||||
data_resources(file_paths),
|
||||
table_name="file_metadata",
|
||||
dataset_name=dataset_name,
|
||||
write_disposition="merge",
|
||||
)
|
||||
|
||||
send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)
|
||||
|
||||
return run_info
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue