From 6f230c5a38d20bd85c49477553ed8a6146ed7cac Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 25 Aug 2025 16:20:49 +0200 Subject: [PATCH] feature: adds environment_setup_and_checks general purpose layer to cognee_pipeline (#1283) ## Description feature: adds environment_setup_and_checks general purpose layer to cognee_pipeline ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- .../layers/environment_setup_and_checks.py | 41 +++++++++++++++++++ .../modules/pipelines/operations/pipeline.py | 40 ++---------------- 2 files changed, 45 insertions(+), 36 deletions(-) create mode 100644 cognee/modules/pipelines/layers/environment_setup_and_checks.py diff --git a/cognee/modules/pipelines/layers/environment_setup_and_checks.py b/cognee/modules/pipelines/layers/environment_setup_and_checks.py new file mode 100644 index 000000000..bca8bc4c9 --- /dev/null +++ b/cognee/modules/pipelines/layers/environment_setup_and_checks.py @@ -0,0 +1,41 @@ +import asyncio +from cognee.context_global_variables import ( + graph_db_config as context_graph_db_config, + vector_db_config as context_vector_db_config, +) + +from cognee.infrastructure.databases.relational import ( + create_db_and_tables as create_relational_db_and_tables, +) +from cognee.infrastructure.databases.vector.pgvector import ( + create_db_and_tables as create_pgvector_db_and_tables, +) + +_first_run_done = False +_first_run_lock = asyncio.Lock() + + +async def environment_setup_and_checks( + vector_db_config: dict = None, + graph_db_config: dict = None, +): + if vector_db_config: + context_vector_db_config.set(vector_db_config) + if graph_db_config: + context_graph_db_config.set(graph_db_config) + + # Create tables for databases + await create_relational_db_and_tables() + await create_pgvector_db_and_tables() + + global _first_run_done + async with _first_run_lock: + if not _first_run_done: + from cognee.infrastructure.llm.utils import ( + test_llm_connection, + test_embedding_connection, + ) + + await test_llm_connection() + await test_embedding_connection() + _first_run_done = True diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e52441101..2a21f4b9e 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -2,6 +2,9 @@ import asyncio from uuid import UUID from typing import Union +from cognee.modules.pipelines.layers.environment_setup_and_checks import ( + environment_setup_and_checks, +) from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset @@ -28,17 +31,6 @@ from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunStarted, ) -from cognee.infrastructure.databases.relational import ( - create_db_and_tables as create_relational_db_and_tables, -) -from cognee.infrastructure.databases.vector.pgvector import ( - create_db_and_tables as create_pgvector_db_and_tables, -) -from cognee.context_global_variables import ( - graph_db_config as context_graph_db_config, - vector_db_config as context_vector_db_config, -) - logger = get_logger("cognee.pipeline") update_status_lock = asyncio.Lock() @@ -54,31 +46,7 @@ async def cognee_pipeline( graph_db_config: dict = None, incremental_loading: bool = False, ): - # Note: These context variables allow different value assignment for databases in Cognee - # per async task, thread, process and etc. - if vector_db_config: - context_vector_db_config.set(vector_db_config) - if graph_db_config: - context_graph_db_config.set(graph_db_config) - - # Create tables for databases - await create_relational_db_and_tables() - await create_pgvector_db_and_tables() - - # Initialize first_run attribute if it doesn't exist - if not hasattr(cognee_pipeline, "first_run"): - cognee_pipeline.first_run = True - - if cognee_pipeline.first_run: - from cognee.infrastructure.llm.utils import ( - test_llm_connection, - test_embedding_connection, - ) - - # Test LLM and Embedding configuration once before running Cognee - await test_llm_connection() - await test_embedding_connection() - cognee_pipeline.first_run = False # Update flag after first run + await environment_setup_and_checks(vector_db_config, graph_db_config) # If no user is provided use default user if user is None: