From 462fcef240b8d5e9e31ac9fe94f90daa14acee6d Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Fri, 6 Dec 2024 13:38:54 +0100 Subject: [PATCH] move config getter into cognee/modules/pipelines/operations/run_tasks.py and make the indentation a bit more readable --- .../modules/pipelines/operations/run_tasks.py | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 52c81e12b..35e32cf74 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -161,25 +161,29 @@ async def run_tasks_base(tasks: list[Task], data = None, user: User = None): }) raise error -async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str, config: dict): +async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str): + + config = get_current_settings() + logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent = 1)) user = await get_default_user() - + try: logger.info("Pipeline run started: `%s`", pipeline_name) - send_telemetry("Pipeline Run Started", user.id, { - "pipeline_name": pipeline_name, - } | config - ) + send_telemetry("Pipeline Run Started", + user.id, + additional_properties = {"pipeline_name": pipeline_name, } | config + ) async for result in run_tasks_base(tasks, data, user): yield result logger.info("Pipeline run completed: `%s`", pipeline_name) - send_telemetry("Pipeline Run Completed", user.id, { - "pipeline_name": pipeline_name, - }) + send_telemetry("Pipeline Run Completed", + user.id, + additional_properties = {"pipeline_name": pipeline_name, } + ) except Exception as error: logger.error( "Pipeline run errored: `%s`\n%s\n", @@ -187,14 +191,14 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str, str(error), exc_info = True, ) - send_telemetry("Pipeline Run Errored", user.id, { - "pipeline_name": pipeline_name, - }) + send_telemetry("Pipeline Run Errored", + user.id, + additional_properties = {"pipeline_name": pipeline_name, } | config + ) raise error async def run_tasks(tasks: list[Task], data = None, pipeline_name: str = "default_pipeline"): - config = get_current_settings() - - async for result in run_tasks_with_telemetry(tasks, data, pipeline_name, config): + + async for result in run_tasks_with_telemetry(tasks, data, pipeline_name): yield result