added formatting
This commit is contained in:
parent
bf15790be8
commit
51fd2a51a8
2 changed files with 32 additions and 9 deletions
|
|
@ -98,10 +98,9 @@ async def run_tasks(
|
||||||
await session.execute(select(Data).filter(Data.id == data_id))
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
).scalar_one_or_none()
|
).scalar_one_or_none()
|
||||||
if data_point:
|
if data_point:
|
||||||
if (
|
if (data_point.pipeline_status or {}).get(pipeline_name, {}).get(
|
||||||
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
|
str(dataset.id)
|
||||||
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
|
) == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED:
|
||||||
):
|
|
||||||
yield {
|
yield {
|
||||||
"run_info": PipelineRunAlreadyCompleted(
|
"run_info": PipelineRunAlreadyCompleted(
|
||||||
pipeline_run_id=pipeline_run_id,
|
pipeline_run_id=pipeline_run_id,
|
||||||
|
|
@ -133,11 +132,20 @@ async def run_tasks(
|
||||||
data_point = (
|
data_point = (
|
||||||
await session.execute(select(Data).filter(Data.id == data_id))
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
).scalar_one_or_none()
|
).scalar_one_or_none()
|
||||||
data_point.pipeline_status[pipeline_name] = {
|
if data_point is not None:
|
||||||
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
|
if data_point.pipeline_status is None:
|
||||||
}
|
data_point.pipeline_status = {}
|
||||||
await session.merge(data_point)
|
data_point.pipeline_status[pipeline_name] = {
|
||||||
await session.commit()
|
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
|
||||||
|
}
|
||||||
|
await session.merge(data_point)
|
||||||
|
await session.commit()
|
||||||
|
else:
|
||||||
|
# Log warning if data point not found but don't fail the pipeline
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
logger.warning(
|
||||||
|
f"Data point with ID {data_id} not found in database, skipping pipeline status update"
|
||||||
|
)
|
||||||
|
|
||||||
yield {
|
yield {
|
||||||
"run_info": PipelineRunCompleted(
|
"run_info": PipelineRunCompleted(
|
||||||
|
|
|
||||||
|
|
@ -227,6 +227,18 @@ async def plugin_ingest_data(
|
||||||
if "_" in content_identifier:
|
if "_" in content_identifier:
|
||||||
return content_identifier.split("_", 1)[-1]
|
return content_identifier.split("_", 1)[-1]
|
||||||
return content_identifier
|
return content_identifier
|
||||||
|
elif field_name == "file_size":
|
||||||
|
# Get file size from metadata or filesystem
|
||||||
|
if "file_size" in metadata:
|
||||||
|
return metadata["file_size"]
|
||||||
|
elif file_path:
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
return os.path.getsize(file_path)
|
||||||
|
except (OSError, TypeError):
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
return default_value
|
return default_value
|
||||||
|
|
||||||
|
|
@ -275,6 +287,9 @@ async def plugin_ingest_data(
|
||||||
content_hash=get_metadata_field(file_metadata, "content_hash"),
|
content_hash=get_metadata_field(file_metadata, "content_hash"),
|
||||||
external_metadata=ext_metadata,
|
external_metadata=ext_metadata,
|
||||||
node_set=json.dumps(node_set) if node_set else None,
|
node_set=json.dumps(node_set) if node_set else None,
|
||||||
|
data_size=get_metadata_field(file_metadata, "file_size"),
|
||||||
|
tenant_id=user.tenant_id if user.tenant_id else None,
|
||||||
|
pipeline_status={},
|
||||||
token_count=-1,
|
token_count=-1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue