From 51fd2a51a812fe5fca3f66dc41ceb63f5269f36a Mon Sep 17 00:00:00 2001 From: vasilije Date: Sat, 2 Aug 2025 21:04:09 +0200 Subject: [PATCH] added formatting --- .../modules/pipelines/operations/run_tasks.py | 26 ++++++++++++------- cognee/tasks/ingestion/plugin_ingest_data.py | 15 +++++++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 1f503f7d2..474a0d3e1 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -98,10 +98,9 @@ async def run_tasks( await session.execute(select(Data).filter(Data.id == data_id)) ).scalar_one_or_none() if data_point: - if ( - data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) - == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED - ): + if (data_point.pipeline_status or {}).get(pipeline_name, {}).get( + str(dataset.id) + ) == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED: yield { "run_info": PipelineRunAlreadyCompleted( pipeline_run_id=pipeline_run_id, @@ -133,11 +132,20 @@ async def run_tasks( data_point = ( await session.execute(select(Data).filter(Data.id == data_id)) ).scalar_one_or_none() - data_point.pipeline_status[pipeline_name] = { - str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED - } - await session.merge(data_point) - await session.commit() + if data_point is not None: + if data_point.pipeline_status is None: + data_point.pipeline_status = {} + data_point.pipeline_status[pipeline_name] = { + str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED + } + await session.merge(data_point) + await session.commit() + else: + # Log warning if data point not found but don't fail the pipeline + logger = get_logger(__name__) + logger.warning( + f"Data point with ID {data_id} not found in database, skipping pipeline status update" + ) yield { "run_info": PipelineRunCompleted( diff --git a/cognee/tasks/ingestion/plugin_ingest_data.py b/cognee/tasks/ingestion/plugin_ingest_data.py index aefdf4924..c038bbb7b 100644 --- a/cognee/tasks/ingestion/plugin_ingest_data.py +++ b/cognee/tasks/ingestion/plugin_ingest_data.py @@ -227,6 +227,18 @@ async def plugin_ingest_data( if "_" in content_identifier: return content_identifier.split("_", 1)[-1] return content_identifier + elif field_name == "file_size": + # Get file size from metadata or filesystem + if "file_size" in metadata: + return metadata["file_size"] + elif file_path: + import os + + try: + return os.path.getsize(file_path) + except (OSError, TypeError): + return None + return None return default_value @@ -275,6 +287,9 @@ async def plugin_ingest_data( content_hash=get_metadata_field(file_metadata, "content_hash"), external_metadata=ext_metadata, node_set=json.dumps(node_set) if node_set else None, + data_size=get_metadata_field(file_metadata, "file_size"), + tenant_id=user.tenant_id if user.tenant_id else None, + pipeline_status={}, token_count=-1, )