fix: Resolve issue with Data object when running cognify pipeline
This commit is contained in:
parent
af2b0735f6
commit
d98253c28c
1 changed files with 14 additions and 8 deletions
|
|
@ -8,6 +8,7 @@ from functools import wraps
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
|
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
|
from cognee.modules.data.models import Data
|
||||||
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
|
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
|
|
@ -89,21 +90,26 @@ async def run_tasks(
|
||||||
try:
|
try:
|
||||||
if not isinstance(data, list):
|
if not isinstance(data, list):
|
||||||
data = [data]
|
data = [data]
|
||||||
data = await resolve_data_directories(data)
|
|
||||||
|
|
||||||
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
|
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
|
||||||
for data_item in data:
|
for data_item in data:
|
||||||
file_path = await save_data_item_to_storage(data_item, dataset.name)
|
# If data is being added to Cognee for the first time calculate the id of the data
|
||||||
# Ingest data and add metadata
|
if not isinstance(data_item, Data):
|
||||||
with open_data_file(file_path, s3fs=fs) as file:
|
data = await resolve_data_directories(data)
|
||||||
classified_data = ingestion.classify(file, s3fs=fs)
|
file_path = await save_data_item_to_storage(data_item, dataset.name)
|
||||||
# data_id is the hash of file contents + owner id to avoid duplicate data
|
# Ingest data and add metadata
|
||||||
data_id = ingestion.identify(classified_data, user)
|
with open_data_file(file_path, s3fs=fs) as file:
|
||||||
|
classified_data = ingestion.classify(file, s3fs=fs)
|
||||||
|
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||||
|
data_id = ingestion.identify(classified_data, user)
|
||||||
|
else:
|
||||||
|
# If data was already processed by Cognee get data id
|
||||||
|
data_id = data_item.id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async for result in run_tasks_with_telemetry(
|
async for result in run_tasks_with_telemetry(
|
||||||
tasks=tasks,
|
tasks=tasks,
|
||||||
data=data_item,
|
data=[data_item],
|
||||||
user=user,
|
user=user,
|
||||||
pipeline_name=pipeline_id,
|
pipeline_name=pipeline_id,
|
||||||
context=context,
|
context=context,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue