From 67b61ff964a3cd9a8ff2c15e894cece8450ce116 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 10 Jul 2025 22:20:30 +0200 Subject: [PATCH] fix: Resolve code graph pipeline issue --- cognee/api/v1/cognify/code_graph_pipeline.py | 4 +- .../modules/pipelines/operations/run_tasks.py | 97 ++++++++++--------- .../get_repo_file_dependencies.py | 3 + 3 files changed, 55 insertions(+), 49 deletions(-) diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index 00a0d3dc9..0da286c4b 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False): async for run_status in non_code_pipeline_run: yield run_status - async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): + async for run_status in run_tasks( + tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False + ): yield run_status diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 4a65faf51..e7968176f 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -95,29 +95,29 @@ async def run_tasks( # TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets) for data_item in data: - # If data is being added to Cognee for the first time calculate the id of the data - if not isinstance(data_item, Data): - data = await resolve_data_directories(data) - file_path = await save_data_item_to_storage(data_item, dataset.name) - # Ingest data and add metadata - with open_data_file(file_path, s3fs=fs) as file: - classified_data = ingestion.classify(file, s3fs=fs) - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - else: - # If data was already processed by Cognee get data id - data_id = data_item.id - - # If incremental_loading is set to True don't process documents already processed by pipeline + # If incremental_loading of data is set to True don't process documents already processed by pipeline if incremental_loading: - # Check pipeline status, if Data already processed for pipeline before skip current processing - async with db_engine.get_async_session() as session: - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - if data_point: - if data_point.pipeline_status.get(pipeline_name) == "Completed": - break + # If data is being added to Cognee for the first time calculate the id of the data + if not isinstance(data_item, Data): + data = await resolve_data_directories(data) + file_path = await save_data_item_to_storage(data_item, dataset.name) + # Ingest data and add metadata + with open_data_file(file_path, s3fs=fs) as file: + classified_data = ingestion.classify(file, s3fs=fs) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + else: + # If data was already processed by Cognee get data id + data_id = data_item.id + + # Check pipeline status, if Data already processed for pipeline before skip current processing + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + if data_point: + if data_point.pipeline_status.get(pipeline_name) == "Completed": + break try: async for result in run_tasks_with_telemetry( @@ -134,23 +134,24 @@ async def run_tasks( payload=result, ) - data_items_pipeline_run_info[data_id] = { - "run_info": PipelineRunCompleted( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - ), - "data_id": data_id, - } + if incremental_loading: + data_items_pipeline_run_info[data_id] = { + "run_info": PipelineRunCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } - # Update pipeline status for Data element - async with db_engine.get_async_session() as session: - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - data_point.pipeline_status[pipeline_name] = "Completed" - await session.merge(data_point) - await session.commit() + # Update pipeline status for Data element + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + data_point.pipeline_status[pipeline_name] = "Completed" + await session.merge(data_point) + await session.commit() except Exception as error: # Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline @@ -158,16 +159,16 @@ async def run_tasks( logger.error( f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}." ) - - data_items_pipeline_run_info = { - "run_info": PipelineRunErrored( - pipeline_run_id=pipeline_run_id, - payload=error, - dataset_id=dataset.id, - dataset_name=dataset.name, - ), - "data_id": data_id, - } + if incremental_loading: + data_items_pipeline_run_info = { + "run_info": PipelineRunErrored( + pipeline_run_id=pipeline_run_id, + payload=error, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } # re-raise error found during data ingestion if ingestion_error: diff --git a/cognee/tasks/repo_processor/get_repo_file_dependencies.py b/cognee/tasks/repo_processor/get_repo_file_dependencies.py index 232850936..b0cdb4402 100644 --- a/cognee/tasks/repo_processor/get_repo_file_dependencies.py +++ b/cognee/tasks/repo_processor/get_repo_file_dependencies.py @@ -103,6 +103,9 @@ async def get_repo_file_dependencies( extraction of dependencies (default is False). (default False) """ + if isinstance(repo_path, list) and len(repo_path) == 1: + repo_path = repo_path[0] + if not os.path.exists(repo_path): raise FileNotFoundError(f"Repository path {repo_path} does not exist.")